From 93424c1c6e64e2b84d2ccc6a26353bb971250847 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 7 Jun 2016 12:24:53 -0500 Subject: [PATCH] Enable events in next tick --- lib/NativeLoop.php | 222 +++++++++++++++++++++++---------------------- 1 file changed, 114 insertions(+), 108 deletions(-) diff --git a/lib/NativeLoop.php b/lib/NativeLoop.php index 1ae0a45..3bb3420 100644 --- a/lib/NativeLoop.php +++ b/lib/NativeLoop.php @@ -24,6 +24,11 @@ class NativeLoop implements Driver { */ private $watchers = []; + /** + * @var \Amp\Loop\Internal\Watcher[] + */ + private $enableQueue = []; + /** * @var \Amp\Loop\Internal\Watcher[] */ @@ -131,7 +136,11 @@ class NativeLoop implements Driver { $this->invokeDeferred(); } - $this->selectStreams($this->readStreams, $this->writeStreams, $this->getTimeout()); + $this->selectStreams( + $this->readStreams, + $this->writeStreams, + empty($this->enableQueue) ? $this->getTimeout() : 0 + ); if (!empty($this->timerExpires)) { $this->invokeTimers(); @@ -140,6 +149,10 @@ class NativeLoop implements Driver { if ($this->signalHandling) { \pcntl_signal_dispatch(); } + + if (!empty($this->enableQueue)) { + $this->enableWatchers(); + } } catch (\Throwable $exception) { if (null === $this->errorHandler) { throw $exception; @@ -184,6 +197,10 @@ class NativeLoop implements Driver { $streamId = (int) $stream; if (isset($this->readWatchers[$streamId])) { foreach ($this->readWatchers[$streamId] as $watcher) { + if (!isset($this->readWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } + $callback = $watcher->callback; $callback($watcher->id, $stream, $watcher->data); } @@ -194,6 +211,10 @@ class NativeLoop implements Driver { $streamId = (int) $stream; if (isset($this->writeWatchers[$streamId])) { foreach ($this->writeWatchers[$streamId] as $watcher) { + if (!isset($this->writeWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } + $callback = $watcher->callback; $callback($watcher->id, $stream, $watcher->data); } @@ -214,22 +235,22 @@ class NativeLoop implements Driver { */ private function getTimeout() { while (!$this->timerQueue->isEmpty()) { - list($watcher, $timeout) = $this->timerQueue->top(); + list($watcher, $expiration) = $this->timerQueue->top(); $id = $watcher->id; - if (!isset($this->timerExpires[$id]) || $timeout !== $this->timerExpires[$id]) { + if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) { $this->timerQueue->extract(); // Timer was removed from queue. continue; } - $timeout -= (int) (\microtime(true) * self::MILLISEC_PER_SEC); + $expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC); - if ($timeout < 0) { + if ($expiration < 0) { return 0; } - return $timeout; + return $expiration; } return -1; @@ -239,13 +260,11 @@ class NativeLoop implements Driver { * Invokes all pending defer watchers. */ private function invokeDeferred() { - $queue = $this->deferQueue; - - foreach ($queue as $watcher) { + foreach ($this->deferQueue as $watcher) { $id = $watcher->id; if (!isset($this->deferQueue[$id])) { - continue; + continue; // Watcher disabled by another defer watcher. } $watcher = $this->watchers[$id]; @@ -263,11 +282,11 @@ class NativeLoop implements Driver { $time = (int) (\microtime(true) * self::MILLISEC_PER_SEC); while (!$this->timerQueue->isEmpty()) { - list($watcher, $timeout) = $this->timerQueue->top(); + list($watcher, $expiration) = $this->timerQueue->top(); $id = $watcher->id; - if (!isset($this->timerExpires[$id]) || $timeout !== $this->timerExpires[$id]) { + if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) { $this->timerQueue->extract(); // Timer was removed from queue. continue; } @@ -279,10 +298,8 @@ class NativeLoop implements Driver { // Remove and execute timer. Replace timer if persistent. $this->timerQueue->extract(); - if ($watcher->type === Watcher::REPEAT) { - $timeout = $time + $watcher->value; - $this->timerQueue->insert([$watcher, $timeout], -$timeout); - $this->timerExpires[$id] = $timeout; + if ($watcher->type & Watcher::REPEAT) { + $this->enableQueue[$id] = $watcher; } else { unset($this->watchers[$id], $this->timerExpires[$id]); } @@ -293,6 +310,63 @@ class NativeLoop implements Driver { } } + /** + * Enables any watchers queued to be enabled on the next tick. + */ + private function enableWatchers() { + $enableQueue = $this->enableQueue; + $this->enableQueue = []; + + foreach ($enableQueue as $watcher) { + switch ($watcher->type) { + case Watcher::READABLE: + $streamId = (int) $watcher->value; + $this->readWatchers[$streamId][$watcher->id] = $watcher; + $this->readStreams[$streamId] = $watcher->value; + break; + + case Watcher::WRITABLE: + $streamId = (int) $watcher->value; + $this->writeWatchers[$streamId][$watcher->id] = $watcher; + $this->writeStreams[$streamId] = $watcher->value; + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + $priority = (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value; + $expiration = (int) $priority; + $this->timerExpires[$watcher->id] = $expiration; + $this->timerQueue->insert([$watcher, $expiration], -$priority); + break; + + case Watcher::DEFER: + $this->deferQueue[$watcher->id] = $watcher; + break; + + case Watcher::SIGNAL: + if (!isset($this->signalWatchers[$watcher->value])) { + if (!@\pcntl_signal($watcher->value, function ($signo) { + foreach ($this->signalWatchers[$signo] as $watcher) { + if (!isset($this->watchers[$watcher->id])) { + continue; + } + + $callback = $watcher->callback; + $callback($watcher->id, $signo, $watcher->data); + } + })) { + throw new \RuntimeException("Failed to register signal handler"); + } + } + + $this->signalWatchers[$watcher->value][$watcher->id] = $watcher; + break; + + default: throw new \DomainException("Unknown watcher type"); + } + } + } + /** * {@inheritdoc} */ @@ -304,7 +378,7 @@ class NativeLoop implements Driver { $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; - $this->deferQueue[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } @@ -327,11 +401,7 @@ class NativeLoop implements Driver { $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; - - $expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value; - - $this->timerExpires[$watcher->id] = $expiration; - $this->timerQueue->insert([$watcher, $expiration], -$expiration); + $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } @@ -354,11 +424,7 @@ class NativeLoop implements Driver { $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; - - $expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value; - - $this->timerExpires[$watcher->id] = $expiration; - $this->timerQueue->insert([$watcher, $expiration], -$expiration); + $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } @@ -375,9 +441,7 @@ class NativeLoop implements Driver { $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; - $streamId = (int) $watcher->value; - $this->readWatchers[$streamId][$watcher->id] = $watcher; - $this->readStreams[$streamId] = $watcher->value; + $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } @@ -394,9 +458,7 @@ class NativeLoop implements Driver { $watcher->data = $data; $this->watchers[$watcher->id] = $watcher; - $streamId = (int) $watcher->value; - $this->writeWatchers[$streamId][$watcher->id] = $watcher; - $this->writeStreams[$streamId] = $watcher->value; + $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } @@ -419,50 +481,12 @@ class NativeLoop implements Driver { $watcher->value = $signo; $watcher->data = $data; - $this->enableSignal($watcher); $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; return $watcher->id; } - /** - * @param \Amp\Loop\Internal\Watcher $watcher - * - * @throws \RuntimeException If creating the backend signal handler fails. - */ - private function enableSignal(Watcher $watcher) { - if (!isset($this->signalWatchers[$watcher->value])) { - if (!@\pcntl_signal($watcher->value, function ($signo) { - foreach ($this->signalWatchers[$signo] as $watcher) { - if (!isset($this->watchers[$watcher->id])) { - continue; - } - - $callback = $watcher->callback; - $callback($watcher->id, $signo, $watcher->data); - } - })) { - throw new \RuntimeException("Failed to register signal handler"); - } - } - - $this->signalWatchers[$watcher->value][$watcher->id] = $watcher; - } - - /** - * @param \Amp\Loop\Internal\Watcher $watcher - */ - private function disableSignal(Watcher $watcher) { - if (isset($this->signalWatchers[$watcher->value])) { - unset($this->signalWatchers[$watcher->value][$watcher->id]); - - if (empty($this->signalWatchers[$watcher->value])) { - unset($this->signalWatchers[$watcher->value]); - @\pcntl_signal($watcher->value, \SIG_DFL); - } - } - } - /** * {@inheritdoc} */ @@ -484,38 +508,8 @@ class NativeLoop implements Driver { return; // Watcher already enabled. } - switch ($watcher->type) { - case Watcher::READABLE: - $streamId = (int) $watcher->value; - $this->readWatchers[$streamId][$watcher->id] = $watcher; - $this->readStreams[$streamId] = $watcher->value; - break; - - case Watcher::WRITABLE: - $streamId = (int) $watcher->value; - $this->writeWatchers[$streamId][$watcher->id] = $watcher; - $this->writeStreams[$streamId] = $watcher->value; - break; - - case Watcher::DELAY: - case Watcher::REPEAT: - $expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value; - $this->timerExpires[$watcher->id] = $expiration; - $this->timerQueue->insert([$watcher, $expiration], -$expiration); - break; - - case Watcher::DEFER: - $this->deferQueue[$watcher->id] = $watcher; - break; - - case Watcher::SIGNAL: - $this->enableSignal($watcher); - break; - - default: throw new \RuntimeException("Unknown watcher type"); - } - $watcher->enabled = true; + $this->enableQueue[$watcher->id] = $watcher; } /** @@ -532,6 +526,13 @@ class NativeLoop implements Driver { return; // Watcher already disabled. } + $watcher->enabled = false; + + if (isset($this->enableQueue[$watcher->id])) { + unset($this->enableQueue[$watcher->id]); + return; // Watcher was only queued to be enabled. + } + switch ($watcher->type) { case Watcher::READABLE: $streamId = (int) $watcher->value; @@ -559,13 +560,18 @@ class NativeLoop implements Driver { break; case Watcher::SIGNAL: - $this->disableSignal($watcher); + if (isset($this->signalWatchers[$watcher->value])) { + unset($this->signalWatchers[$watcher->value][$watcher->id]); + + if (empty($this->signalWatchers[$watcher->value])) { + unset($this->signalWatchers[$watcher->value]); + @\pcntl_signal($watcher->value, \SIG_DFL); + } + } break; - default: throw new \RuntimeException("Unknown watcher type"); + default: throw new \DomainException("Unknown watcher type"); } - - $watcher->enabled = false; } /** @@ -625,7 +631,7 @@ class NativeLoop implements Driver { case Watcher::DELAY: $array = &$delay; break; case Watcher::REPEAT: $array = &$repeat; break; - default: throw new \RuntimeException("Unknown watcher type"); + default: throw new \DomainException("Unknown watcher type"); } if ($watcher->enabled) {