1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 05:11:42 +01:00

Clear watchers on run() loop exit, throw on invalid run/tick/stop calls

This commit is contained in:
Daniel Lowrey 2015-08-04 11:06:49 -04:00
parent 5ed70577fc
commit 98f609a99a
6 changed files with 250 additions and 100 deletions

View File

@ -16,14 +16,14 @@ class EvReactor implements Reactor {
private $immediates = [];
private $watcherCallback;
private $keepAliveCount = 0;
private $isRunning = false;
private $state = self::STOPPED;
private $stopException;
private $onError;
private $onCoroutineResolution;
public function __construct($flags = null) {
// @codeCoverageIgnoreStart
if (!extension_loaded("ev")) {
if (!\extension_loaded("ev")) {
throw new \RuntimeException(
"The pecl ev extension is required to use " . __CLASS__
);
@ -80,34 +80,44 @@ class EvReactor implements Reactor {
* {@inheritdoc}
*/
public function run(callable $onStart = null) {
if ($this->isRunning) {
return;
if ($this->state !== self::STOPPED) {
throw new \LogicException(
"Cannot run() recursively; event reactor already active"
);
}
$this->isRunning = true;
if ($onStart) {
$this->state = self::STARTING;
$onStartWatcherId = $this->immediately($onStart);
$this->tryImmediate($this->watchers[$onStartWatcherId]);
if (empty($this->keepAliveCount)) {
$this->isRunning = false;
return;
if (empty($this->keepAliveCount) && empty($this->stopException)) {
$this->state = self::STOPPED;
}
} else {
$this->state = self::RUNNING;
}
while ($this->isRunning) {
while ($this->state > self::STOPPED) {
$immediates = $this->immediates;
foreach ($immediates as $watcher) {
if (!$this->tryImmediate($watcher)) {
break;
}
}
if (!$this->isRunning || empty($this->keepAliveCount)) {
if (empty($this->keepAliveCount) || $this->state <= self::STOPPED) {
break;
}
$flags = $this->immediates ? (Ev::RUN_ONCE | Ev::RUN_NOWAIT) : Ev::RUN_ONCE;
$this->loop->run($flags);
}
if ($this->watchers) {
foreach (array_keys($this->watchers) as $watcherId) {
$this->cancel($watcherId);
}
}
$this->state = self::STOPPED;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
@ -133,7 +143,7 @@ class EvReactor implements Reactor {
$this->onCallbackError($e);
}
return $this->isRunning;
return $this->state;
}
/**
@ -171,9 +181,15 @@ class EvReactor implements Reactor {
* {@inheritdoc}
*/
public function tick($noWait = false) {
$noWait = (bool) $noWait;
$this->isRunning = true;
if ($this->state) {
throw new \LogicException(
"Cannot tick() recursively; event reactor already active"
);
}
$this->state = self::TICKING;
$noWait = (bool) $noWait;
$immediates = $this->immediates;
foreach ($immediates as $watcher) {
if (!$this->tryImmediate($watcher)) {
@ -181,12 +197,13 @@ class EvReactor implements Reactor {
}
}
if ($this->isRunning) {
// Check the conditional again because a manual stop() could've changed the state
if ($this->state) {
$flags = $noWait || $this->immediates ? Ev::RUN_NOWAIT | Ev::RUN_ONCE : Ev::RUN_ONCE;
$this->loop->run($flags);
$this->isRunning = false;
}
$this->state = self::STOPPED;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
@ -198,8 +215,14 @@ class EvReactor implements Reactor {
* {@inheritDoc}
*/
public function stop() {
$this->loop->stop();
$this->isRunning = false;
if ($this->state !== self::STOPPED) {
$this->loop->stop();
$this->state = self::STOPPING;
} else {
throw new \LogicException(
"Cannot stop(); event reactor not currently active"
);
}
}
/**

View File

@ -10,8 +10,7 @@ class LibeventReactor implements Reactor {
private $watchers = [];
private $immediates = [];
private $keepAliveCount = 0;
private $resolution = 1000;
private $isRunning = false;
private $state = self::STOPPED;
private $stopException;
private $onError;
private $onCoroutineResolution;
@ -23,7 +22,7 @@ class LibeventReactor implements Reactor {
public function __construct() {
// @codeCoverageIgnoreStart
if (!extension_loaded("libevent")) {
if (!\extension_loaded("libevent")) {
throw new \RuntimeException(
"The pecl libevent extension is required to use " . __CLASS__
);
@ -60,28 +59,31 @@ class LibeventReactor implements Reactor {
* {@inheritDoc}
*/
public function run(callable $onStart = null) {
if ($this->isRunning) {
return;
if ($this->state !== self::STOPPED) {
throw new \LogicException(
"Cannot run() recursively; event reactor already active"
);
}
$this->isRunning = true;
if ($onStart) {
$this->state = self::STARTING;
$onStartWatcherId = $this->immediately($onStart);
$this->tryImmediate($this->watchers[$onStartWatcherId]);
if (empty($this->keepAliveCount)) {
$this->isRunning = false;
return;
if (empty($this->keepAliveCount) && empty($this->stopException)) {
$this->state = self::STOPPED;
}
} else {
$this->state = self::RUNNING;
}
while ($this->isRunning) {
while ($this->state > self::STOPPED) {
$immediates = $this->immediates;
foreach ($immediates as $watcher) {
if (!$this->tryImmediate($watcher)) {
break;
}
}
if (!$this->isRunning || empty($this->keepAliveCount)) {
if (empty($this->keepAliveCount) || $this->state <= self::STOPPED) {
break;
}
$flags = \EVLOOP_ONCE | \EVLOOP_NONBLOCK;
@ -90,6 +92,13 @@ class LibeventReactor implements Reactor {
\event_base_loop($this->keepAliveBase, $flags);
}
if ($this->watchers) {
foreach (array_keys($this->watchers) as $watcherId) {
$this->cancel($watcherId);
}
}
$this->state = self::STOPPED;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
@ -118,27 +127,36 @@ class LibeventReactor implements Reactor {
$this->onCallbackError($e);
}
return $this->isRunning;
return $this->state;
}
/**
* {@inheritDoc}
*/
public function tick($noWait = false) {
$this->isRunning = true;
if ($this->state) {
throw new \LogicException(
"Cannot tick() recursively; event reactor already active"
);
}
$this->state = self::TICKING;
$immediates = $this->immediates;
foreach ($immediates as $watcher) {
if (!$this->tryImmediate($watcher)) {
break;
}
}
if ($this->isRunning) {
// Check the conditional again because a manual stop() could've changed the state
if ($this->state) {
$flags = \EVLOOP_ONCE | \EVLOOP_NONBLOCK;
\event_base_loop($this->base, \EVLOOP_ONCE | \EVLOOP_NONBLOCK);
$flags = $noWait || !empty($this->immediates) ? (EVLOOP_ONCE | EVLOOP_NONBLOCK) : EVLOOP_ONCE;
\event_base_loop($this->keepAliveBase, $flags);
}
$this->isRunning = false;
$this->state = self::STOPPED;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
@ -150,9 +168,15 @@ class LibeventReactor implements Reactor {
* {@inheritDoc}
*/
public function stop() {
\event_base_loopexit($this->base);
\event_base_loopexit($this->keepAliveBase);
$this->isRunning = false;
if ($this->state !== self::STOPPED) {
\event_base_loopexit($this->base);
\event_base_loopexit($this->keepAliveBase);
$this->state = self::STOPPING;
} else {
throw new \LogicException(
"Cannot stop(); event reactor not currently active"
);
}
}
/**
@ -193,7 +217,7 @@ class LibeventReactor implements Reactor {
public function once(callable $callback, $msDelay, array $options = []) {
assert(($msDelay >= 0), "\$msDelay at Argument 2 expects integer >= 0");
$watcher = $this->initWatcher(Watcher::TIMER_ONCE, $callback, $options);
$watcher->msDelay = ($msDelay * $this->resolution);
$watcher->msDelay = ($msDelay * 1000);
\event_timer_set($watcher->eventResource, $this->wrapCallback($watcher));
\event_base_set($watcher->eventResource, $watcher->eventBase);
if ($watcher->isEnabled) {
@ -281,7 +305,7 @@ class LibeventReactor implements Reactor {
if (isset($options["ms_delay"])) {
$msDelay = (int) $options["ms_delay"];
assert(($msDelay >= 0), "ms_delay option expects integer >= 0");
$msDelay = ($msDelay * $this->resolution);
$msDelay = ($msDelay * 1000);
} else {
$msDelay = $msInterval;
}

View File

@ -12,9 +12,9 @@ class NativeReactor implements Reactor {
private $writeStreams = [];
private $readWatchers = [];
private $writeWatchers = [];
private $isTimerSortNeeded;
private $isRunning = false;
private $isTicking = false;
private $timersEnabled = false;
private $isTimerSortNeeded = false;
private $state = self::STOPPED;
private $onCoroutineResolution;
private $hasExtPcntl;
private $signalState;
@ -48,25 +48,43 @@ class NativeReactor implements Reactor {
* {@inheritDoc}
*/
public function run(callable $onStart = null) {
if ($this->isRunning) {
return;
if ($this->state !== self::STOPPED) {
throw new \LogicException(
"Cannot run() recursively; event reactor already active"
);
}
$this->isRunning = true;
if ($onStart) {
$onStartWatcherId = $this->immediately($onStart);
$this->tryImmediate($this->watchers[$onStartWatcherId]);
if (empty($this->keepAliveCount)) {
$this->isRunning = false;
$this->state = self::STARTING;
$watcherId = $this->immediately($onStart);
if (!$this->tryImmediate($this->watchers[$watcherId]) || empty($this->keepAliveCount)) {
$this->unload();
return;
}
} else {
$this->state = self::RUNNING;
}
$this->enableTimers();
while ($this->isRunning) {
$this->tick();
while ($this->state > self::STOPPED) {
$this->doTick($noWait = false);
if (empty($this->keepAliveCount)) {
break;
}
}
$this->unload();
}
private function unload() {
if ($this->watchers) {
foreach (array_keys($this->watchers) as $watcherId) {
$this->cancel($watcherId);
}
}
$this->timersEnabled = false;
$this->state = self::STOPPED;
}
private function tryImmediate($watcher) {
@ -87,7 +105,7 @@ class NativeReactor implements Reactor {
\call_user_func($this->onCoroutineResolution, $e);
}
return $this->isRunning;
return $this->state;
}
private function enableTimers() {
@ -99,6 +117,7 @@ class NativeReactor implements Reactor {
$watcher->nextExecutionAt = $now + $watcher->msDelay;
$this->timerOrder[$watcherId] = $watcher->nextExecutionAt;
}
$this->timersEnabled = true;
$this->isTimerSortNeeded = true;
}
@ -106,16 +125,41 @@ class NativeReactor implements Reactor {
* {@inheritDoc}
*/
public function stop() {
$this->isRunning = $this->isTicking = false;
if ($this->state !== self::STOPPED) {
$this->state = self::STOPPED;
$this->timersEnabled = false;
} else {
throw new \LogicException(
"Cannot stop(); event reactor not currently active"
);
}
}
/**
* {@inheritDoc}
*/
public function tick($noWait = false) {
$noWait = (bool) $noWait;
$this->isTicking = true;
if (!$this->isRunning) {
if ($this->state) {
throw new \LogicException(
"Cannot tick() recursively; event reactor already active"
);
}
try {
$this->state = self::TICKING;
$this->doTick((bool) $noWait);
$this->state = self::STOPPED;
} catch (\Throwable $e) {
$this->unload();
throw $e;
} catch (\Exception $e) {
$this->unload();
throw $e;
}
}
private function doTick($noWait) {
if (empty($this->timersEnabled)) {
$this->enableTimers();
}
@ -130,7 +174,7 @@ class NativeReactor implements Reactor {
}
// If an immediately watcher called stop() we pull out here
if (!$this->isTicking) {
if ($this->state <= self::STOPPING) {
return;
}
@ -174,8 +218,6 @@ class NativeReactor implements Reactor {
if ($this->timerOrder || $this->immediates) {
$this->executeTimers();
}
$this->isTicking = false;
}
private function selectActionableStreams($timeout) {
@ -304,7 +346,7 @@ class NativeReactor implements Reactor {
$this->keepAliveCount += ($watcher->keepAlive && $watcher->isEnabled);
if ($watcher->isEnabled && $this->isRunning) {
if ($watcher->isEnabled && $this->state > self::STOPPED) {
$nextExecutionAt = microtime(true) + $watcher->msDelay;
$watcher->nextExecutionAt = $nextExecutionAt;
$this->timerOrder[$watcherId] = $nextExecutionAt;
@ -343,7 +385,7 @@ class NativeReactor implements Reactor {
$this->keepAliveCount += ($watcher->keepAlive && $watcher->isEnabled);
if ($watcher->isEnabled && $this->isRunning) {
if ($watcher->isEnabled && $this->state > self::STOPPED) {
$increment = (isset($watcher->msDelay) ? $watcher->msDelay : $watcher->msInterval);
$nextExecutionAt = microtime(true) + $increment;
$this->timerOrder[$watcherId] = $watcher->nextExecutionAt = $nextExecutionAt;

View File

@ -3,6 +3,12 @@
namespace Amp;
interface Reactor {
const STOPPING = -1;
const STOPPED = 0;
const STARTING = 1;
const TICKING = 2;
const RUNNING = 3;
/**
* Start the event reactor and assume program flow control
*

View File

@ -13,7 +13,7 @@ class UvReactor implements Reactor {
private $watchers;
private $keepAliveCount = 0;
private $streamIdPollMap = [];
private $isRunning = false;
private $state = self::STOPPED;
private $stopException;
private $isWindows;
private $immediates = [];
@ -27,7 +27,7 @@ class UvReactor implements Reactor {
public function __construct() {
// @codeCoverageIgnoreStart
if (!extension_loaded("uv")) {
if (!\extension_loaded("uv")) {
throw new \RuntimeException(
"The php-uv extension is required to use " . __CLASS__
);
@ -63,33 +63,43 @@ class UvReactor implements Reactor {
* {@inheritDoc}
*/
public function run(callable $onStart = null) {
if ($this->isRunning) {
return;
if ($this->state !== self::STOPPED) {
throw new \LogicException(
"Cannot run() recursively; event reactor already active"
);
}
$this->isRunning = true;
if ($onStart) {
$this->state = self::STARTING;
$onStartWatcherId = $this->immediately($onStart);
$this->tryImmediate($this->watchers[$onStartWatcherId]);
if (empty($this->keepAliveCount)) {
$this->isRunning = false;
return;
if (empty($this->keepAliveCount) && empty($this->stopException)) {
$this->state = self::STOPPED;
}
} else {
$this->state = self::RUNNING;
}
while ($this->isRunning) {
while ($this->state > self::STOPPED) {
$immediates = $this->immediates;
foreach ($immediates as $watcher) {
if (!$this->tryImmediate($watcher)) {
break;
}
}
if (empty($this->keepAliveCount)) {
if (empty($this->keepAliveCount) || $this->state <= self::STOPPED) {
break;
}
\uv_run($this->loop, \UV::RUN_DEFAULT | (empty($this->immediates) ? \UV::RUN_ONCE : \UV::RUN_NOWAIT));
}
if ($this->watchers) {
foreach (array_keys($this->watchers) as $watcherId) {
$this->cancel($watcherId);
}
}
$this->state = self::STOPPED;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
@ -115,26 +125,36 @@ class UvReactor implements Reactor {
$this->onCallbackError($e);
}
return $this->isRunning;
return $this->state;
}
/**
* {@inheritDoc}
*/
public function tick($noWait = false) {
if ($this->state) {
throw new \LogicException(
"Cannot tick() recursively; event reactor already active"
);
}
$this->state = self::TICKING;
$noWait = (bool) $noWait;
$this->isRunning = true;
$immediates = $this->immediates;
foreach ($immediates as $watcher) {
if (!$this->tryImmediate($watcher)) {
break;
}
}
if ($this->isRunning) {
// Check the conditional again because a manual stop() could've changed the state
if ($this->state) {
$flags = $noWait || !empty($this->immediates) ? (\UV::RUN_NOWAIT | \UV::RUN_ONCE) : \UV::RUN_ONCE;
\uv_run($this->loop, $flags);
}
$this->isRunning = false;
$this->state = self::STOPPED;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
@ -146,8 +166,14 @@ class UvReactor implements Reactor {
* {@inheritDoc}
*/
public function stop() {
\uv_stop($this->loop);
$this->isRunning = false;
if ($this->state !== self::STOPPED) {
\uv_stop($this->loop);
$this->state = self::STOPPING;
} else {
throw new \LogicException(
"Cannot stop(); event reactor not currently active"
);
}
}
/**

View File

@ -3,12 +3,43 @@
namespace Amp\Test;
abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
public function testMultipleCallsToRunHaveNoEffect() {
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot stop(); event reactor not currently active
*/
public function testStopThrowsIfNotCurrentlyRunning() {
\Amp\stop();
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot run() recursively; event reactor already active
*/
public function testRecursiveRunCallThrows() {
\Amp\run(function () {
\Amp\run();
});
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot tick() recursively; event reactor already active
*/
public function testRecursiveTickCallThrows() {
\Amp\immediately('\Amp\tick');
\Amp\tick();
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Cannot tick() recursively; event reactor already active
*/
public function testRecursiveTickCallThrowsInsideRun() {
\Amp\run(function () {
\Amp\tick();
});
}
public function testImmediatelyWatcherKeepAliveRunResult() {
$invoked = false;
\Amp\run(function () use (&$invoked) {
@ -260,23 +291,36 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
});
}
public function testEnablingWatcherAllowsSubsequentInvocation() {
public function testThatWatchersAreCollectedWhenRunLoopExits() {
$increment = 0;
$watcherId = \Amp\immediately(function () use (&$increment) {
$increment++;
});
\Amp\disable($watcherId);
\Amp\once('\Amp\stop', $msDelay = 50);
$watcherId = \Amp\once(function () use (&$increment) {
$increment++;
}, 1);
\Amp\disable($watcherId);
$watcherId = \Amp\repeat(function () use (&$increment) {
$increment++;
}, 1);
\Amp\disable($watcherId);
$watcherId = \Amp\onReadable(STDIN, function () use (&$increment) {
$increment++;
});
\Amp\disable($watcherId);
$watcherId = \Amp\onWritable(STDOUT, function () use (&$increment) {
$increment++;
});
\Amp\disable($watcherId);
\Amp\run();
$this->assertEquals(0, $increment);
\Amp\enable($watcherId);
\Amp\once('\Amp\stop', $msDelay = 50);
\Amp\run();
$this->assertEquals(1, $increment);
$this->assertEquals(0, \Amp\info()["keep_alive"]);
}
public function testTimerWatcherParameterOrder() {
@ -329,21 +373,6 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$this->assertEquals(0, $increment);
}
public function testUnresolvedEventsAreReenabledOnRunFollowingPreviousStop() {
$increment = 0;
\Amp\once(function () use (&$increment) {
$increment++;
\Amp\stop();
}, $msDelay = 150);
\Amp\run('\Amp\stop');
$this->assertEquals(0, $increment);
\usleep(150000);
\Amp\run();
$this->assertEquals(1, $increment);
}
public function testImmediateExecution() {
$increment = 0;
\Amp\immediately(function () use (&$increment) {