diff --git a/.travis.yml b/.travis.yml index c38702f..f27e7b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,6 +17,40 @@ env: - DEPS=highest install: + - git clone https://github.com/libuv/libuv; + pushd libuv; + git checkout $(git describe --tags); + ./autogen.sh; + ./configure --prefix=$(dirname `pwd`)/libuv-install; + make; + make install; + popd; + git clone https://github.com/bwoebi/php-uv.git; + pushd php-uv; + phpize; + ./configure --with-uv=$(dirname `pwd`)/libuv-install; + make; + make install; + popd; + echo "extension=uv.so" >> "$(php -r 'echo php_ini_loaded_file();')"; + + - curl -LS https://pecl.php.net/get/ev | tar -xz; + pushd ev-*; + phpize; + ./configure; + make; + make install; + popd; + echo "extension=ev.so" >> "$(php -r 'echo php_ini_loaded_file();')"; + - curl -LS https://pecl.php.net/get/event | tar -xz; + pushd event-*; + phpize; + ./configure --with-event-core --with-event-extra --with-event-pthreads; + make; + make install; + popd; + echo "extension=event.so" >> "$(php -r 'echo php_ini_loaded_file();')"; + - if [ "$DEPS" = "lowest" ]; then composer update -n --prefer-source --prefer-lowest; else diff --git a/lib/EvLoop.php b/lib/EvLoop.php new file mode 100644 index 0000000..cec20ea --- /dev/null +++ b/lib/EvLoop.php @@ -0,0 +1,190 @@ +handle = new \EvLoop; + + if (self::$activeSignals === null) { + self::$activeSignals = &$this->signals; + } + + $this->ioCallback = function (\EvIO $event) { + /** @var \Amp\Loop\Internal\Watcher $watcher */ + $watcher = $event->data; + + $callback = $watcher->callback; + $callback($watcher->id, $watcher->value, $watcher->data); + }; + + $this->timerCallback = function (\EvTimer $event) { + /** @var \Amp\Loop\Internal\Watcher $watcher */ + $watcher = $event->data; + + if ($watcher->type & Watcher::DELAY) { + $this->cancel($watcher->id); + } + + $callback = $watcher->callback; + $callback($watcher->id, $watcher->data); + }; + + $this->signalCallback = function (\EvSignal $event) { + /** @var \Amp\Loop\Internal\Watcher $watcher */ + $watcher = $event->data; + + $callback = $watcher->callback; + $callback($watcher->id, $watcher->value, $watcher->data); + }; + } + + public function __destruct() { + foreach ($this->events as $event) { + $event->stop(); + } + } + + /** + * {@inheritdoc} + */ + public function run() { + $active = self::$activeSignals; + + foreach ($active as $event) { + $event->stop(); + } + + self::$activeSignals = &$this->signals; + + foreach ($this->signals as $event) { + $event->start(); + } + + try { + parent::run(); + } finally { + foreach ($this->signals as $event) { + $event->stop(); + } + + self::$activeSignals = &$active; + + foreach ($active as $event) { + $event->start(); + } + } + } + + /** + * {@inheritdoc} + */ + public function stop() { + $this->handle->stop(); + parent::stop(); + } + + /** + * {@inheritdoc} + */ + protected function dispatch($blocking) { + $this->handle->run($blocking ? \Ev::RUN_ONCE : \Ev::RUN_ONCE | \Ev::RUN_NOWAIT); + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers) { + foreach ($watchers as $watcher) { + if (!isset($this->events[$id = $watcher->id])) { + switch ($watcher->type) { + case Watcher::READABLE: + $this->events[$id] = $this->handle->io($watcher->value, \Ev::READ, $this->ioCallback, $watcher); + break; + + case Watcher::WRITABLE: + $this->events[$id] = $this->handle->io($watcher->value, \Ev::WRITE, $this->ioCallback, $watcher); + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + $interval = $watcher->value / self::MILLISEC_PER_SEC; + $this->events[$id] = $this->handle->timer( + $interval, + $watcher->type & Watcher::REPEAT ? $interval : 0, + $this->timerCallback, + $watcher + ); + break; + + case Watcher::SIGNAL: + $this->events[$id] = $this->handle->signal($watcher->value, $this->signalCallback, $watcher); + break; + + default: + throw new \DomainException("Unknown watcher type"); + } + } else { + $this->events[$id]->start(); + } + + if ($watcher->type === Watcher::SIGNAL) { + $this->signals[$id] = $this->events[$id]; + } + } + } + + /** + * {@inheritdoc} + */ + protected function deactivate(Watcher $watcher) { + if (isset($this->events[$id = $watcher->id])) { + $this->events[$id]->stop(); + if ($watcher->type === Watcher::SIGNAL) { + unset($this->signals[$id]); + } + } + } + + /** + * {@inheritdoc} + */ + public function cancel($watcherIdentifier) { + parent::cancel($watcherIdentifier); + unset($this->events[$watcherIdentifier]); + } + + /** + * {@inheritdoc} + */ + public function getHandle() { + return $this->handle; + } +} diff --git a/lib/EventLoop.php b/lib/EventLoop.php new file mode 100644 index 0000000..0f08eca --- /dev/null +++ b/lib/EventLoop.php @@ -0,0 +1,213 @@ +handle = new \EventBase; + + if (self::$activeSignals === null) { + self::$activeSignals = &$this->signals; + } + + $this->ioCallback = function ($resource, $what, Watcher $watcher) { + $callback = $watcher->callback; + $callback($watcher->id, $watcher->value, $watcher->data); + }; + + $this->timerCallback = function ($resource, $what, Watcher $watcher) { + if ($watcher->type & Watcher::DELAY) { + $this->cancel($watcher->id); + } + + $callback = $watcher->callback; + $callback($watcher->id, $watcher->data); + }; + + $this->signalCallback = function ($signum, $what, Watcher $watcher) { + $callback = $watcher->callback; + $callback($watcher->id, $watcher->value, $watcher->data); + }; + } + + public function __destruct() { + foreach ($this->events as $event) { + $event->free(); + } + } + + /** + * {@inheritdoc} + */ + public function run() { + $active = self::$activeSignals; + + foreach ($active as $event) { + $event->del(); + } + + self::$activeSignals = &$this->signals; + + foreach ($this->signals as $event) { + $event->add(); + } + + try { + parent::run(); + } finally { + foreach ($this->signals as $event) { + $event->del(); + } + + self::$activeSignals = &$active; + + foreach ($active as $event) { + $event->add(); + } + } + } + + /** + * {@inheritdoc} + */ + public function stop() { + $this->handle->stop(); + parent::stop(); + } + + /** + * {@inheritdoc} + */ + protected function dispatch($blocking) { + $this->handle->loop($blocking ? \EventBase::LOOP_ONCE : \EventBase::LOOP_ONCE | \EventBase::LOOP_NONBLOCK); + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers) { + foreach ($watchers as $watcher) { + if (!isset($this->events[$id = $watcher->id])) { + switch ($watcher->type) { + case Watcher::READABLE: + $this->events[$id] = new \Event( + $this->handle, + $watcher->value, + \Event::READ | \Event::PERSIST, + $this->ioCallback, + $watcher + ); + break; + + case Watcher::WRITABLE: + $this->events[$id] = new \Event( + $this->handle, + $watcher->value, + \Event::WRITE | \Event::PERSIST, + $this->ioCallback, + $watcher + ); + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + $this->events[$id] = new \Event( + $this->handle, + -1, + \Event::TIMEOUT | \Event::PERSIST, + $this->timerCallback, + $watcher + ); + break; + + case Watcher::SIGNAL: + $this->events[$id] = new \Event( + $this->handle, + $watcher->value, + \Event::SIGNAL | \Event::PERSIST, + $this->signalCallback, + $watcher + ); + break; + + default: + throw new \DomainException("Unknown watcher type"); + } + } + + switch ($watcher->type) { + case Watcher::DELAY: + case Watcher::REPEAT: + $this->events[$id]->add($watcher->value / self::MILLISEC_PER_SEC); + break; + + case Watcher::SIGNAL: + $this->signals[$id] = $this->events[$id]; + // No break + + default: + $this->events[$id]->add(); + break; + } + } + } + + /** + * {@inheritdoc} + */ + protected function deactivate(Watcher $watcher) { + if (isset($this->events[$id = $watcher->id])) { + $this->events[$id]->del(); + + if ($watcher->type === Watcher::SIGNAL) { + unset($this->signals[$id]); + } + } + } + + /** + * {@inheritdoc} + */ + public function cancel($watcherIdentifier) { + parent::cancel($watcherIdentifier); + + if (isset($this->events[$watcherIdentifier])) { + $this->events[$watcherIdentifier]->free(); + unset($this->events[$watcherIdentifier]); + } + } + + /** + * {@inheritdoc} + */ + public function getHandle() { + return $this->handle; + } +} diff --git a/lib/Internal/Watcher.php b/lib/Internal/Watcher.php new file mode 100644 index 0000000..7903347 --- /dev/null +++ b/lib/Internal/Watcher.php @@ -0,0 +1,43 @@ +running; + ++$this->running; + + try { + while ($this->running > $previous) { + if ($this->isEmpty()) { + return; + } + $this->tick(); + } + } finally { + $this->running = $previous; + } + } + + /** + * {@inheritdoc} + */ + public function stop() { + --$this->running > 0 ?: $this->running = 0; + } + + /** + * @return bool True if no enabled and referenced watchers remain in the loop. + */ + private function isEmpty() { + foreach ($this->watchers as $watcher) { + if ($watcher->enabled && $watcher->referenced) { + return false; + } + } + + return true; + } + + /** + * Executes a single tick of the event loop. + */ + private function tick() { + $this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue); + $this->nextTickQueue = []; + + $this->activate($this->enableQueue); + $this->enableQueue = []; + + try { + foreach ($this->deferQueue as $watcher) { + if (!isset($this->deferQueue[$watcher->id])) { + continue; // Watcher disabled by another defer watcher. + } + + unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]); + + $callback = $watcher->callback; + $callback($watcher->id, $watcher->data); + } + + $this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running); + + } catch (\Throwable $exception) { + if (null === $this->errorHandler) { + throw $exception; + } + + $errorHandler = $this->errorHandler; + $errorHandler($exception); + } catch (\Exception $exception) { // @todo Remove when PHP 5.x support is no longer needed. + if (null === $this->errorHandler) { + throw $exception; + } + + $errorHandler = $this->errorHandler; + $errorHandler($exception); + } + } + + /** + * Dispatches any pending read/write, timer, and signal events. + * + * @param bool $blocking + */ + abstract protected function dispatch($blocking); + + /** + * Activates (enables) all the given watchers. + * + * @param \Amp\Loop\Internal\Watcher[] $watchers + */ + abstract protected function activate(array $watchers); + + /** + * Deactivates (disables) the given watcher. + * + * @param \Amp\Loop\Internal\Watcher $watcher + */ + abstract protected function deactivate(Watcher $watcher); + + /** + * {@inheritdoc} + */ + public function defer(callable $callback, $data = null) { + $watcher = new Watcher; + $watcher->type = Watcher::DEFER; + $watcher->id = $this->nextId++; + $watcher->callback = $callback; + $watcher->data = $data; + + $this->watchers[$watcher->id] = $watcher; + $this->nextTickQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * {@inheritdoc} + */ + public function delay($delay, callable $callback, $data = null) { + $delay = (int) $delay; + + if ($delay < 0) { + throw new \InvalidArgumentException("Delay must be greater than or equal to zero"); + } + + $watcher = new Watcher; + $watcher->type = Watcher::DELAY; + $watcher->id = $this->nextId++; + $watcher->callback = $callback; + $watcher->value = $delay; + $watcher->data = $data; + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * {@inheritdoc} + */ + public function repeat($interval, callable $callback, $data = null) { + $interval = (int) $interval; + + if ($interval < 0) { + throw new \InvalidArgumentException("Interval must be greater than or equal to zero"); + } + + $watcher = new Watcher; + $watcher->type = Watcher::REPEAT; + $watcher->id = $this->nextId++; + $watcher->callback = $callback; + $watcher->value = $interval; + $watcher->data = $data; + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * {@inheritdoc} + */ + public function onReadable($stream, callable $callback, $data = null) { + $watcher = new Watcher; + $watcher->type = Watcher::READABLE; + $watcher->id = $this->nextId++; + $watcher->callback = $callback; + $watcher->value = $stream; + $watcher->data = $data; + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * {@inheritdoc} + */ + public function onWritable($stream, callable $callback, $data = null) { + $watcher = new Watcher; + $watcher->type = Watcher::WRITABLE; + $watcher->id = $this->nextId++; + $watcher->callback = $callback; + $watcher->value = $stream; + $watcher->data = $data; + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * {@inheritdoc} + * + * @throws \AsyncInterop\Loop\UnsupportedFeatureException If the pcntl extension is not available. + * @throws \RuntimeException If creating the backend signal handler fails. + */ + public function onSignal($signo, callable $callback, $data = null) { + $watcher = new Watcher; + $watcher->type = Watcher::SIGNAL; + $watcher->id = $this->nextId++; + $watcher->callback = $callback; + $watcher->value = $signo; + $watcher->data = $data; + + $this->watchers[$watcher->id] = $watcher; + $this->enableQueue[$watcher->id] = $watcher; + + return $watcher->id; + } + + /** + * {@inheritdoc} + */ + public function enable($watcherIdentifier) { + if (!isset($this->watchers[$watcherIdentifier])) { + throw new InvalidWatcherException($watcherIdentifier, "Cannot enable an invalid watcher identifier: '{$watcherIdentifier}'"); + } + + $watcher = $this->watchers[$watcherIdentifier]; + + if ($watcher->enabled) { + return; // Watcher already enabled. + } + + $watcher->enabled = true; + + switch ($watcher->type) { + case Watcher::DEFER: + $this->nextTickQueue[$watcher->id] = $watcher; + break; + + default: + $this->enableQueue[$watcher->id] = $watcher; + break; + } + } + + /** + * {@inheritdoc} + */ + public function disable($watcherIdentifier) { + if (!isset($this->watchers[$watcherIdentifier])) { + return; + } + + $watcher = $this->watchers[$watcherIdentifier]; + + if (!$watcher->enabled) { + return; // Watcher already disabled. + } + + $watcher->enabled = false; + $id = $watcher->id; + + switch ($watcher->type) { + case Watcher::DEFER: + if (isset($this->nextTickQueue[$id])) { + // Watcher was only queued to be enabled. + unset($this->nextTickQueue[$id]); + } else { + unset($this->deferQueue[$id]); + } + break; + + default: + if (isset($this->enableQueue[$id])) { + // Watcher was only queued to be enabled. + unset($this->enableQueue[$id]); + } else { + $this->deactivate($watcher); + } + break; + } + } + + /** + * {@inheritdoc} + */ + public function cancel($watcherIdentifier) { + $this->disable($watcherIdentifier); + unset($this->watchers[$watcherIdentifier]); + } + + /** + * {@inheritdoc} + */ + public function reference($watcherIdentifier) { + if (!isset($this->watchers[$watcherIdentifier])) { + throw new InvalidWatcherException($watcherIdentifier, "Cannot reference an invalid watcher identifier: '{$watcherIdentifier}'"); + } + + $this->watchers[$watcherIdentifier]->referenced = true; + } + + /** + * {@inheritdoc} + */ + public function unreference($watcherIdentifier) { + if (!isset($this->watchers[$watcherIdentifier])) { + throw new InvalidWatcherException($watcherIdentifier, "Cannot unreference an invalid watcher identifier: '{$watcherIdentifier}'"); + } + + $this->watchers[$watcherIdentifier]->referenced = false; + } + + /** + * {@inheritdoc} + */ + public function setErrorHandler(callable $callback = null) { + $previous = $this->errorHandler; + $this->errorHandler = $callback; + return $previous; + } + + /** + * {@inheritdoc} + */ + public function getInfo() { + $watchers = [ + "referenced" => 0, + "unreferenced" => 0, + ]; + + $defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [ + "enabled" => 0, + "disabled" => 0, + ]; + + foreach ($this->watchers as $watcher) { + switch ($watcher->type) { + case Watcher::READABLE: $array = &$onReadable; break; + case Watcher::WRITABLE: $array = &$onWritable; break; + case Watcher::SIGNAL: $array = &$onSignal; break; + case Watcher::DEFER: $array = &$defer; break; + case Watcher::DELAY: $array = &$delay; break; + case Watcher::REPEAT: $array = &$repeat; break; + + default: throw new \DomainException("Unknown watcher type"); + } + + if ($watcher->enabled) { + ++$array["enabled"]; + + if ($watcher->referenced) { + ++$watchers["referenced"]; + } else { + ++$watchers["unreferenced"]; + } + } else { + ++$array["disabled"]; + } + } + + return [ + "watchers" => $watchers, + "defer" => $defer, + "delay" => $delay, + "repeat" => $repeat, + "on_readable" => $onReadable, + "on_writable" => $onWritable, + "on_signal" => $onSignal, + "running" => (bool) $this->running, + ]; + } + + /** + * Returns the same array of data as getInfo(). + * + * @return array + */ + public function __debugInfo() { + return $this->getInfo(); + } +} diff --git a/src/Loop/Driver.php b/lib/Loop/Driver.php similarity index 100% rename from src/Loop/Driver.php rename to lib/Loop/Driver.php diff --git a/src/Loop/DriverFactory.php b/lib/Loop/DriverFactory.php similarity index 100% rename from src/Loop/DriverFactory.php rename to lib/Loop/DriverFactory.php diff --git a/src/Loop/InvalidWatcherException.php b/lib/Loop/InvalidWatcherException.php similarity index 100% rename from src/Loop/InvalidWatcherException.php rename to lib/Loop/InvalidWatcherException.php diff --git a/src/Loop/UnsupportedFeatureException.php b/lib/Loop/UnsupportedFeatureException.php similarity index 100% rename from src/Loop/UnsupportedFeatureException.php rename to lib/Loop/UnsupportedFeatureException.php diff --git a/lib/LoopFactory.php b/lib/LoopFactory.php new file mode 100644 index 0000000..ea09f23 --- /dev/null +++ b/lib/LoopFactory.php @@ -0,0 +1,29 @@ +timerQueue = new \SplPriorityQueue(); + $this->signalHandling = \extension_loaded("pcntl"); + } + + protected function dispatch($blocking) { + $this->selectStreams( + $this->readStreams, + $this->writeStreams, + $blocking ? $this->getTimeout() : 0 + ); + + if (!empty($this->timerExpires)) { + $time = (int) (\microtime(true) * self::MILLISEC_PER_SEC); + + while (!$this->timerQueue->isEmpty()) { + list($watcher, $expiration) = $this->timerQueue->top(); + + $id = $watcher->id; + + if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) { + $this->timerQueue->extract(); // Timer was removed from queue. + continue; + } + + if ($this->timerExpires[$id] > $time) { // Timer at top of queue has not expired. + break; + } + + $this->timerQueue->extract(); + + if ($watcher->type & Watcher::REPEAT) { + $this->activate([$watcher]); + } else { + $this->cancel($id); + } + + // Execute the timer. + $callback = $watcher->callback; + $callback($id, $watcher->data); + } + } + + if ($this->signalHandling) { + \pcntl_signal_dispatch(); + } + } + + /** + * @param resource[] $read + * @param resource[] $write + * @param int $timeout + */ + private function selectStreams(array $read, array $write, $timeout) { + $timeout /= self::MILLISEC_PER_SEC; + + if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop. + if ($timeout >= 0) { + $seconds = (int) $timeout; + $microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC); + } else { + $seconds = null; + $microseconds = null; + } + + $except = null; + + // Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal. + $count = @\stream_select($read, $write, $except, $seconds, $microseconds); + + if ($count) { + foreach ($read as $stream) { + $streamId = (int) $stream; + if (isset($this->readWatchers[$streamId])) { + foreach ($this->readWatchers[$streamId] as $watcher) { + if (!isset($this->readWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } + + $callback = $watcher->callback; + $callback($watcher->id, $stream, $watcher->data); + } + } + } + + foreach ($write as $stream) { + $streamId = (int) $stream; + if (isset($this->writeWatchers[$streamId])) { + foreach ($this->writeWatchers[$streamId] as $watcher) { + if (!isset($this->writeWatchers[$streamId][$watcher->id])) { + continue; // Watcher disabled by another IO watcher. + } + + $callback = $watcher->callback; + $callback($watcher->id, $stream, $watcher->data); + } + } + } + } + + return; + } + + if ($timeout > 0) { // Otherwise sleep with usleep() if $timeout > 0. + \usleep($timeout * self::MICROSEC_PER_SEC); + } + } + + /** + * @return int Milliseconds until next timer expires or -1 if there are no pending times. + */ + private function getTimeout() { + while (!$this->timerQueue->isEmpty()) { + list($watcher, $expiration) = $this->timerQueue->top(); + + $id = $watcher->id; + + if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) { + $this->timerQueue->extract(); // Timer was removed from queue. + continue; + } + + $expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC); + + if ($expiration < 0) { + return 0; + } + + return $expiration; + } + + return -1; + } + + /** + * {@inheritdoc} + * + * @throws \AsyncInterop\Loop\UnsupportedFeatureException If the pcntl extension is not available. + * @throws \RuntimeException If creating the backend signal handler fails. + */ + public function onSignal($signo, callable $callback, $data = null) { + if (!$this->signalHandling) { + throw new UnsupportedFeatureException("Signal handling requires the pcntl extension"); + } + + return parent::onSignal($signo, $callback, $data); + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers) { + foreach ($watchers as $watcher) { + switch ($watcher->type) { + case Watcher::READABLE: + $streamId = (int) $watcher->value; + $this->readWatchers[$streamId][$watcher->id] = $watcher; + $this->readStreams[$streamId] = $watcher->value; + break; + + case Watcher::WRITABLE: + $streamId = (int) $watcher->value; + $this->writeWatchers[$streamId][$watcher->id] = $watcher; + $this->writeStreams[$streamId] = $watcher->value; + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + $expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value; + $this->timerExpires[$watcher->id] = $expiration; + $this->timerQueue->insert([$watcher, $expiration], -$expiration); + break; + + case Watcher::SIGNAL: + if (!isset($this->signalWatchers[$watcher->value])) { + if (!@\pcntl_signal($watcher->value, [$this, 'handleSignal'])) { + throw new \RuntimeException("Failed to register signal handler"); + } + } + + $this->signalWatchers[$watcher->value][$watcher->id] = $watcher; + break; + + default: + throw new \DomainException("Unknown watcher type"); + } + } + } + + /** + * {@inheritdoc} + */ + protected function deactivate(Watcher $watcher) { + switch ($watcher->type) { + case Watcher::READABLE: + $streamId = (int) $watcher->value; + unset($this->readWatchers[$streamId][$watcher->id]); + if (empty($this->readWatchers[$streamId])) { + unset($this->readWatchers[$streamId], $this->readStreams[$streamId]); + } + break; + + case Watcher::WRITABLE: + $streamId = (int) $watcher->value; + unset($this->writeWatchers[$streamId][$watcher->id]); + if (empty($this->writeWatchers[$streamId])) { + unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]); + } + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + unset($this->timerExpires[$watcher->id]); + break; + + case Watcher::SIGNAL: + if (isset($this->signalWatchers[$watcher->value])) { + unset($this->signalWatchers[$watcher->value][$watcher->id]); + + if (empty($this->signalWatchers[$watcher->value])) { + unset($this->signalWatchers[$watcher->value]); + @\pcntl_signal($watcher->value, \SIG_DFL); + } + } + break; + + default: throw new \DomainException("Unknown watcher type"); + } + } + + /** + * @param int $signo + */ + private function handleSignal($signo) { + foreach ($this->signalWatchers[$signo] as $watcher) { + if (!isset($this->signalWatchers[$signo][$watcher->id])) { + continue; + } + + $callback = $watcher->callback; + $callback($watcher->id, $signo, $watcher->data); + } + } + + /** + * {@inheritdoc} + */ + public function getHandle() { + return null; + } +} diff --git a/lib/UvLoop.php b/lib/UvLoop.php new file mode 100644 index 0000000..febe673 --- /dev/null +++ b/lib/UvLoop.php @@ -0,0 +1,255 @@ +handle = \uv_loop_new(); + + $this->ioCallback = function ($event, $status, $events, $resource) { + switch ($status) { + case 0: // OK + break; + + // If $status is a severe error, stop the poll and throw an exception. + case \UV::EACCES: + case \UV::EBADF: + case \UV::EINVAL: + case \UV::ENOTSOCK: + throw new \RuntimeException( + \sprintf("UV_%s: %s", \uv_err_name($status), \ucfirst(\uv_strerror($status))) + ); + + default: // Ignore other (probably) trivial warnings and continuing polling. + return; + } + + $watchers = $this->watchers[(int) $event]; + + foreach ($watchers as $watcher) { + $callback = $watcher->callback; + $callback($watcher->id, $resource, $watcher->data); + } + }; + + $this->timerCallback = function ($event) { + $watcher = $this->watchers[(int) $event]; + + if ($watcher->type & Watcher::DELAY) { + $this->cancel($watcher->id); + } + + $callback = $watcher->callback; + $callback($watcher->id, $watcher->data); + }; + + $this->signalCallback = function ($event, $signo) { + $watcher = $this->watchers[(int) $event]; + + $callback = $watcher->callback; + $callback($watcher->id, $signo, $watcher->data); + }; + } + + /** + * {@inheritdoc} + */ + protected function dispatch($blocking) { + \uv_run($this->handle, $blocking ? \UV::RUN_ONCE : \UV::RUN_NOWAIT); + } + + /** + * {@inheritdoc} + */ + protected function activate(array $watchers) { + foreach ($watchers as $watcher) { + $id = $watcher->id; + + switch ($watcher->type) { + case Watcher::READABLE: + $streamId = (int) $watcher->value; + + if (isset($this->read[$streamId])) { + $event = $this->read[$streamId]; + } elseif (isset($this->events[$id])) { + $event = $this->read[$streamId] = $this->events[$id]; + } else { + $event = $this->read[$streamId] = \uv_poll_init_socket($this->handle, $watcher->value); + } + + $this->events[$id] = $event; + $this->watchers[(int) $event][$id] = $watcher; + + if (!\uv_is_active($event)) { + \uv_poll_start($event, \UV::READABLE, $this->ioCallback); + } + break; + + case Watcher::WRITABLE: + $streamId = (int) $watcher->value; + + if (isset($this->write[$streamId])) { + $event = $this->write[$streamId]; + } elseif (isset($this->events[$id])) { + $event = $this->write[$streamId] = $this->events[$id]; + } else { + $event = $this->write[$streamId] = \uv_poll_init_socket($this->handle, $watcher->value); + } + + $this->events[$id] = $event; + $this->watchers[(int) $event][$id] = $watcher; + + + if (!\uv_is_active($event)) { + \uv_poll_start($event, \UV::WRITABLE, $this->ioCallback); + } + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + if (isset($this->events[$id])) { + $event = $this->events[$id]; + } else { + $event = $this->events[$id] = \uv_timer_init($this->handle); + } + + $this->watchers[(int) $event] = $watcher; + + \uv_timer_start( + $event, + $watcher->value, + $watcher->type & Watcher::REPEAT ? $watcher->value : 0, + $this->timerCallback + ); + break; + + case Watcher::SIGNAL: + if (isset($this->events[$id])) { + $event = $this->events[$id]; + } else { + $event = $this->events[$id] = \uv_signal_init($this->handle); + } + + $this->watchers[(int) $event] = $watcher; + + \uv_signal_start($event, $this->signalCallback, $watcher->value); + break; + + default: throw new \DomainException("Unknown watcher type"); + } + } + } + + /** + * {@inheritdoc} + */ + protected function deactivate(Watcher $watcher) { + $id = $watcher->id; + + if (!isset($this->events[$id])) { + return; + } + + $event = $this->events[$id]; + $eventId = (int) $event; + + switch ($watcher->type) { + case Watcher::READABLE: + unset($this->watchers[$eventId][$id]); + + if (empty($this->watchers[$eventId])) { + unset($this->watchers[$eventId]); + unset($this->read[(int) $watcher->value]); + if (\uv_is_active($event)) { + \uv_poll_stop($event); + } + } + break; + + case Watcher::WRITABLE: + unset($this->watchers[$eventId][$id]); + + if (empty($this->watchers[$eventId])) { + unset($this->watchers[$eventId]); + unset($this->write[(int) $watcher->value]); + if (\uv_is_active($event)) { + \uv_poll_stop($event); + } + } + break; + + case Watcher::DELAY: + case Watcher::REPEAT: + unset($this->watchers[$eventId]); + if (\uv_is_active($event)) { + \uv_timer_stop($event); + } + break; + + case Watcher::SIGNAL: + unset($this->watchers[$eventId]); + if (\uv_is_active($event)) { + \uv_signal_stop($event); + } + break; + + default: throw new \DomainException("Unknown watcher type"); + } + } + + /** + * {@inheritdoc} + */ + public function cancel($watcherIdentifier) { + parent::cancel($watcherIdentifier); + + if (!isset($this->events[$watcherIdentifier])) { + return; + } + + $event = $this->events[$watcherIdentifier]; + + if (empty($this->watchers[(int) $event])) { + \uv_close($event); + } + + unset($this->events[$watcherIdentifier]); + } + + /** + * {@inheritdoc} + */ + public function getHandle() { + return $this->handle; + } +} diff --git a/phpunit.xml b/phpunit.xml deleted file mode 100644 index b2dbf88..0000000 --- a/phpunit.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - ./test - - - - - ./src - - - diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 9ef7f5a..8e29216 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,21 +1,24 @@ test + + test/phpt + @@ -24,5 +27,6 @@ + - \ No newline at end of file + diff --git a/test/EvLoopTest.php b/test/EvLoopTest.php new file mode 100644 index 0000000..2ec7716 --- /dev/null +++ b/test/EvLoopTest.php @@ -0,0 +1,21 @@ +getMockBuilder(DriverFactory::class)->getMock(); + + $factory->method('create') + ->willReturn(new EvLoop); + + return $factory; + } +} diff --git a/test/EventLoopTest.php b/test/EventLoopTest.php new file mode 100644 index 0000000..db6da7b --- /dev/null +++ b/test/EventLoopTest.php @@ -0,0 +1,21 @@ +getMockBuilder(DriverFactory::class)->getMock(); + + $factory->method('create') + ->willReturn(new EventLoop); + + return $factory; + } +} diff --git a/test/NativeLoopTest.php b/test/NativeLoopTest.php new file mode 100644 index 0000000..78dd958 --- /dev/null +++ b/test/NativeLoopTest.php @@ -0,0 +1,18 @@ +getMockBuilder(DriverFactory::class)->getMock(); + + $factory->method('create') + ->willReturn(new NativeLoop()); + + return $factory; + } +} diff --git a/test/UvLoopTest.php b/test/UvLoopTest.php new file mode 100644 index 0000000..fecb785 --- /dev/null +++ b/test/UvLoopTest.php @@ -0,0 +1,21 @@ +getMockBuilder(DriverFactory::class)->getMock(); + + $factory->method('create') + ->willReturn(new UvLoop); + + return $factory; + } +}