mirror of
https://github.com/danog/amp.git
synced 2024-11-27 04:24:42 +01:00
Fix some bugs, refactor disable/unreference
This commit is contained in:
parent
acbe2c8237
commit
666bb332e1
@ -1,5 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
class AlreadyRunningException extends \LogicException {}
|
@ -17,6 +17,16 @@ class Watcher {
|
||||
*/
|
||||
public $type;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
public $enabled = true;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
public $referenced = true;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
|
@ -24,7 +24,7 @@ class NativeLoop implements Driver {
|
||||
private $watchers = [];
|
||||
|
||||
/**
|
||||
* @var string[]
|
||||
* @var \Amp\Loop\Internal\Watcher[]
|
||||
*/
|
||||
private $deferQueue = [];
|
||||
|
||||
@ -63,20 +63,15 @@ class NativeLoop implements Driver {
|
||||
*/
|
||||
private $signalWatchers = [];
|
||||
|
||||
/**
|
||||
* @var \Amp\Loop\Internal\Watcher[]
|
||||
*/
|
||||
private $unreferenced = [];
|
||||
|
||||
/**
|
||||
* @var callable
|
||||
*/
|
||||
private $errorHandler;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
* @var int
|
||||
*/
|
||||
private $running = false;
|
||||
private $running = 0;
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
@ -90,25 +85,19 @@ class NativeLoop implements Driver {
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*
|
||||
* @throws \Amp\Loop\AlreadyRunningException
|
||||
*/
|
||||
public function run() {
|
||||
if ($this->running) {
|
||||
throw new AlreadyRunningException("Cannot run loop recursively; loop already running");
|
||||
}
|
||||
|
||||
$this->running = true;
|
||||
$previous = $this->running++;
|
||||
|
||||
try {
|
||||
while ($this->running) {
|
||||
while ($this->running > $previous) {
|
||||
if ($this->isEmpty()) {
|
||||
return;
|
||||
}
|
||||
$this->tick();
|
||||
}
|
||||
} finally {
|
||||
$this->stop();
|
||||
$this->running = $previous;
|
||||
}
|
||||
}
|
||||
|
||||
@ -116,22 +105,20 @@ class NativeLoop implements Driver {
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function stop() {
|
||||
$this->running = false;
|
||||
--$this->running;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool True if no referenced watchers remain in the loop.
|
||||
* @return bool True if no enabled and referenced watchers remain in the loop.
|
||||
*/
|
||||
private function isEmpty() {
|
||||
if (empty($this->watchers)) {
|
||||
return true;
|
||||
foreach ($this->watchers as $watcher) {
|
||||
if ($watcher->enabled && $watcher->referenced) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (empty($this->unreferenced)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return \count($this->watchers) === \count($this->unreferenced);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -226,7 +213,9 @@ class NativeLoop implements Driver {
|
||||
*/
|
||||
private function getTimeout() {
|
||||
while (!$this->timerQueue->isEmpty()) {
|
||||
list($id, $timeout) = $this->timerQueue->top();
|
||||
list($watcher, $timeout) = $this->timerQueue->top();
|
||||
|
||||
$id = $watcher->id;
|
||||
|
||||
if (!isset($this->timerExpires[$id]) || $timeout !== $this->timerExpires[$id]) {
|
||||
$this->timerQueue->extract(); // Timer was removed from queue.
|
||||
@ -251,13 +240,15 @@ class NativeLoop implements Driver {
|
||||
private function invokeDeferred() {
|
||||
$queue = $this->deferQueue;
|
||||
|
||||
foreach ($queue as $id) {
|
||||
if (!isset($this->watchers[$id]) || !isset($this->deferQueue[$id])) {
|
||||
foreach ($queue as $watcher) {
|
||||
$id = $watcher->id;
|
||||
|
||||
if (!isset($this->deferQueue[$id])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$watcher = $this->watchers[$id];
|
||||
unset($this->watchers[$id], $this->deferQueue[$id], $this->unreferenced[$id]);
|
||||
unset($this->watchers[$id], $this->deferQueue[$id]);
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->data);
|
||||
@ -271,8 +262,10 @@ class NativeLoop implements Driver {
|
||||
$time = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
|
||||
|
||||
while (!$this->timerQueue->isEmpty()) {
|
||||
list($id, $timeout) = $this->timerQueue->top();
|
||||
|
||||
list($watcher, $timeout) = $this->timerQueue->top();
|
||||
|
||||
$id = $watcher->id;
|
||||
|
||||
if (!isset($this->timerExpires[$id]) || $timeout !== $this->timerExpires[$id]) {
|
||||
$this->timerQueue->extract(); // Timer was removed from queue.
|
||||
continue;
|
||||
@ -285,14 +278,12 @@ class NativeLoop implements Driver {
|
||||
// Remove and execute timer. Replace timer if persistent.
|
||||
$this->timerQueue->extract();
|
||||
|
||||
$watcher = $this->watchers[$id];
|
||||
|
||||
if ($watcher->type === Watcher::REPEAT) {
|
||||
$timeout = $time + $watcher->value;
|
||||
$this->timerQueue->insert([$id, $timeout], -$timeout);
|
||||
$this->timerQueue->insert([$watcher, $timeout], -$timeout);
|
||||
$this->timerExpires[$id] = $timeout;
|
||||
} else {
|
||||
unset($this->watchers[$id], $this->timerExpires[$id], $this->unreferenced[$id]);
|
||||
unset($this->watchers[$id], $this->timerExpires[$id]);
|
||||
}
|
||||
|
||||
// Execute the timer.
|
||||
@ -312,7 +303,7 @@ class NativeLoop implements Driver {
|
||||
$watcher->data = $data;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
$this->deferQueue[$watcher->id] = $watcher->id;
|
||||
$this->deferQueue[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
@ -339,7 +330,7 @@ class NativeLoop implements Driver {
|
||||
$expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value;
|
||||
|
||||
$this->timerExpires[$watcher->id] = $expiration;
|
||||
$this->timerQueue->insert([$watcher->id, $expiration], -$expiration);
|
||||
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
@ -366,7 +357,7 @@ class NativeLoop implements Driver {
|
||||
$expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value;
|
||||
|
||||
$this->timerExpires[$watcher->id] = $expiration;
|
||||
$this->timerQueue->insert([$watcher->id, $expiration], -$expiration);
|
||||
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
@ -413,7 +404,7 @@ class NativeLoop implements Driver {
|
||||
* {@inheritdoc}
|
||||
*
|
||||
* @throws \Interop\Async\Loop\UnsupportedFeatureException If the pcntl extension is not available.
|
||||
* @throws \Amp\Loop\SignalHandlerException If creating the backend signal handler fails.
|
||||
* @throws \RuntimeException If creating the backend signal handler fails.
|
||||
*/
|
||||
public function onSignal($signo, callable $callback, $data = null) {
|
||||
if (!$this->signalHandling) {
|
||||
@ -421,7 +412,7 @@ class NativeLoop implements Driver {
|
||||
}
|
||||
|
||||
$watcher = new Watcher;
|
||||
$watcher->type = Watcher::WRITABLE;
|
||||
$watcher->type = Watcher::SIGNAL;
|
||||
$watcher->id = $this->nextId++;
|
||||
$watcher->callback = $callback;
|
||||
$watcher->value = $signo;
|
||||
@ -436,7 +427,7 @@ class NativeLoop implements Driver {
|
||||
/**
|
||||
* @param \Amp\Loop\Internal\Watcher $watcher
|
||||
*
|
||||
* @throws \Amp\Loop\SignalHandlerException If creating the backend signal handler fails.
|
||||
* @throws \RuntimeException If creating the backend signal handler fails.
|
||||
*/
|
||||
private function enableSignal(Watcher $watcher) {
|
||||
if (!isset($this->signalWatchers[$watcher->value])) {
|
||||
@ -450,7 +441,7 @@ class NativeLoop implements Driver {
|
||||
$callback($watcher->id, $signo, $watcher->data);
|
||||
}
|
||||
})) {
|
||||
throw new SignalHandlerException("Failed to register signal handler");
|
||||
throw new \RuntimeException("Failed to register signal handler");
|
||||
}
|
||||
}
|
||||
|
||||
@ -491,13 +482,13 @@ class NativeLoop implements Driver {
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
$this->readWatchers[$streamId][$watcher->id] = $watcher->id;
|
||||
$this->readWatchers[$streamId][$watcher->id] = $watcher;
|
||||
$this->readStreams[$streamId] = $watcher->value;
|
||||
break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
$this->writeWatchers[$streamId][$watcher->id] = $watcher->id;
|
||||
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
|
||||
$this->writeStreams[$streamId] = $watcher->value;
|
||||
break;
|
||||
|
||||
@ -509,17 +500,21 @@ class NativeLoop implements Driver {
|
||||
|
||||
$expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value;
|
||||
$this->timerExpires[$watcher->id] = $expiration;
|
||||
$this->timerQueue->insert([$watcher->id, $expiration], -$expiration);
|
||||
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
|
||||
break;
|
||||
|
||||
case Watcher::DEFER:
|
||||
$this->deferQueue[$watcher->id] = $watcher->id;
|
||||
$this->deferQueue[$watcher->id] = $watcher;
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
$this->enableSignal($watcher);
|
||||
break;
|
||||
|
||||
default: throw new \RuntimeException("Unknown watcher type");
|
||||
}
|
||||
|
||||
$watcher->enabled = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -537,7 +532,7 @@ class NativeLoop implements Driver {
|
||||
$streamId = (int) $watcher->value;
|
||||
unset($this->readWatchers[$streamId][$watcher->id]);
|
||||
if (empty($this->readWatchers[$streamId])) {
|
||||
unset($this->writeWatchers[$streamId], $this->readStreams[$streamId]);
|
||||
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
|
||||
}
|
||||
break;
|
||||
|
||||
@ -561,7 +556,11 @@ class NativeLoop implements Driver {
|
||||
case Watcher::SIGNAL:
|
||||
$this->disableSignal($watcher);
|
||||
break;
|
||||
|
||||
default: throw new \RuntimeException("Unknown watcher type");
|
||||
}
|
||||
|
||||
$watcher->enabled = false;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -569,14 +568,18 @@ class NativeLoop implements Driver {
|
||||
*/
|
||||
public function cancel($watcherIdentifier) {
|
||||
$this->disable($watcherIdentifier);
|
||||
unset($this->watchers[$watcherIdentifier], $this->unreferenced[$watcherIdentifier]);
|
||||
unset($this->watchers[$watcherIdentifier]);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function reference($watcherIdentifier) {
|
||||
unset($this->unreferenced[$watcherIdentifier]);
|
||||
if (!isset($this->watchers[$watcherIdentifier])) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->watchers[$watcherIdentifier]->referenced = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -587,7 +590,7 @@ class NativeLoop implements Driver {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->unreferenced[$watcherIdentifier] = $watcherIdentifier;
|
||||
$this->watchers[$watcherIdentifier]->referenced = false;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -595,8 +598,8 @@ class NativeLoop implements Driver {
|
||||
*/
|
||||
public function info() {
|
||||
$watchers = [
|
||||
"referenced" => \count($this->watchers) - \count($this->unreferenced),
|
||||
"unreferenced" => \count($this->unreferenced),
|
||||
"referenced" => 0,
|
||||
"unreferenced" => 0,
|
||||
];
|
||||
|
||||
$defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [
|
||||
@ -606,37 +609,26 @@ class NativeLoop implements Driver {
|
||||
|
||||
foreach ($this->watchers as $watcher) {
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
if (isset($this->readWatchers[(int) $watcher->value][$watcher->id])) {
|
||||
++$onReadable["enabled"];
|
||||
} else {
|
||||
++$onReadable["disabled"];
|
||||
}
|
||||
break;
|
||||
case Watcher::READABLE: $array = &$onReadable; break;
|
||||
case Watcher::WRITABLE: $array = &$onWritable; break;
|
||||
case Watcher::SIGNAL: $array = &$onSignal; break;
|
||||
case Watcher::DEFER: $array = &$defer; break;
|
||||
case Watcher::DELAY: $array = &$delay; break;
|
||||
case Watcher::REPEAT: $array = &$repeat; break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
if (isset($this->writeWatchers[(int) $watcher->value][$watcher->id])) {
|
||||
++$onWritable["enabled"];
|
||||
} else {
|
||||
++$onWritable["disabled"];
|
||||
}
|
||||
break;
|
||||
default: throw new \RuntimeException("Unknown watcher type");
|
||||
}
|
||||
|
||||
case Watcher::DEFER:
|
||||
++$defer["enabled"];
|
||||
break;
|
||||
if ($watcher->enabled) {
|
||||
++$array["enabled"];
|
||||
|
||||
case Watcher::DELAY:
|
||||
++$delay["enabled"];
|
||||
break;
|
||||
|
||||
case Watcher::REPEAT:
|
||||
if (isset($this->timerExpires[$watcher->id])) {
|
||||
++$repeat["enabled"];
|
||||
} else {
|
||||
++$repeat["disabled"];
|
||||
}
|
||||
break;
|
||||
if ($watcher->referenced) {
|
||||
++$watchers["referenced"];
|
||||
} else {
|
||||
++$watchers["unreferenced"];
|
||||
}
|
||||
} else {
|
||||
++$array["disabled"];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
class SignalHandlerException extends \RuntimeException {}
|
Loading…
Reference in New Issue
Block a user