diff --git a/lib/UvReactor.php b/lib/UvReactor.php index 314fa6d..1aa0575 100644 --- a/lib/UvReactor.php +++ b/lib/UvReactor.php @@ -14,11 +14,13 @@ class UvReactor implements SignalReactor { private $stopException; private $resolution = 1000; private $isWindows; + private $immediates = []; private static $MODE_ONCE = 0; private static $MODE_REPEAT = 1; private static $MODE_STREAM = 2; private static $MODE_SIGNAL = 3; + private static $MODE_IMMEDIATE = 4; public function __construct($newLoop = false) { $this->loop = $newLoop ? uv_loop_new() : uv_default_loop(); @@ -48,8 +50,13 @@ class UvReactor implements SignalReactor { if ($onStart) { $this->immediately(function() use ($onStart) { $onStart($this); }); } - uv_run($this->loop); - $this->isRunning = false; + + while ($this->isRunning) { + if ($this->immediates && !$this->doImmediates()) { + break; + } + uv_run($this->loop, \UV::RUN_NOWAIT | \UV::RUN_ONCE); + } if ($this->stopException) { $e = $this->stopException; @@ -58,6 +65,23 @@ class UvReactor implements SignalReactor { } } + private function doImmediates() { + $immediates = $this->immediates; + foreach ($immediates as $watcherId => $callback) { + $callback($this); + unset( + $this->immediates[$watcherId], + $this->watchers[$watcherId] + ); + if (!$this->isRunning) { + // If a watcher stops the reactor break out of the loop + break; + } + } + + return $this->isRunning; + } + /** * Execute a single event loop iteration * @@ -65,8 +89,16 @@ class UvReactor implements SignalReactor { * @return void */ public function tick() { + if ($this->isRunning) { + return; + } + $this->isRunning = true; - uv_run_once($this->loop); + + if (empty($this->immediates) || $this->doImmediates()) { + uv_run($this->loop, \UV::RUN_NOWAIT | \UV::RUN_ONCE); + } + $this->isRunning = false; if ($this->stopException) { @@ -83,6 +115,7 @@ class UvReactor implements SignalReactor { */ public function stop() { uv_stop($this->loop); + $this->isRunning = false; } /** @@ -92,7 +125,18 @@ class UvReactor implements SignalReactor { * @return int Returns a unique integer watcher ID */ public function immediately(callable $callback) { - return $this->startTimer($callback, $msDelay = 0, $msInterval = 0, self::$MODE_ONCE); + $watcherId = $this->lastWatcherId++; + $this->immediates[$watcherId] = $callback; + + $watcher = new \StdClass; + $watcher->id = $watcherId; + $watcher->mode = self::$MODE_IMMEDIATE; + $watcher->callback = $callback; + $watcher->isEnabled = true; + + $this->watchers[$watcher->id] = $watcher; + + return $watcherId; } /** @@ -327,6 +371,9 @@ class UvReactor implements SignalReactor { case self::$MODE_SIGNAL: uv_signal_stop($watcher->uvStruct); break; + case self::$MODE_IMMEDIATE: + unset($this->immediates[$watcherId]); + break; default: uv_timer_stop($watcher->uvStruct); break; @@ -365,6 +412,9 @@ class UvReactor implements SignalReactor { case self::$MODE_SIGNAL: uv_signal_stop($watcher->uvStruct); break; + case self::$MODE_IMMEDIATE: + unset($this->immediates[$watcher->id]); + break; default: uv_timer_stop($watcher->uvStruct); break; @@ -397,6 +447,9 @@ class UvReactor implements SignalReactor { case self::$MODE_SIGNAL: uv_signal_start($watcher->uvStruct, $watcher->callback, $watcher->signo); break; + case self::$MODE_IMMEDIATE: + $this->immediates[$watcher->id] = $watcher->callback; + break; default: uv_timer_start($watcher->uvStruct, $watcher->msDelay, $watcher->msInterval, $watcher->callback); break;