1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 13:21:16 +01:00
amp/lib/NativeLoop.php

678 lines
20 KiB
PHP
Raw Normal View History

2016-05-19 11:12:35 -05:00
<?php
namespace Amp\Loop;
2016-05-25 23:09:53 -05:00
use Amp\Loop\Internal\Watcher;
use Interop\Async\Loop\Driver;
use Interop\Async\Loop\InvalidWatcherException;
use Interop\Async\Loop\Registry;
use Interop\Async\Loop\UnsupportedFeatureException;
2016-05-19 11:12:35 -05:00
class NativeLoop implements Driver {
use Registry;
2016-05-19 11:12:35 -05:00
const MILLISEC_PER_SEC = 1e3;
const MICROSEC_PER_SEC = 1e6;
/**
* @var string
*/
private $nextId = "a";
2016-05-19 11:12:35 -05:00
/**
* @var \Amp\Loop\Internal\Watcher[]
*/
private $watchers = [];
2016-06-07 12:24:53 -05:00
/**
* @var \Amp\Loop\Internal\Watcher[]
*/
private $enableQueue = [];
2016-05-19 11:12:35 -05:00
/**
* @var \Amp\Loop\Internal\Watcher[]
2016-05-19 11:12:35 -05:00
*/
private $deferQueue = [];
/**
* @var resource[]
*/
private $readStreams = [];
/**
* @var \Amp\Loop\Internal\Watcher[][]
2016-05-19 11:12:35 -05:00
*/
private $readWatchers = [];
/**
* @var resource[]
*/
private $writeStreams = [];
/**
* @var \Amp\Loop\Internal\Watcher[][]
2016-05-19 11:12:35 -05:00
*/
private $writeWatchers = [];
/**
* @var int[]
*/
private $timerExpires = [];
/**
* @var \SplPriorityQueue
*/
private $timerQueue;
/**
* @var \Amp\Loop\Internal\Watcher[][]
2016-05-19 11:12:35 -05:00
*/
private $signalWatchers = [];
/**
* @var callable
*/
private $errorHandler;
/**
* @var int
2016-05-19 11:12:35 -05:00
*/
private $running = 0;
2016-05-19 11:12:35 -05:00
/**
* @var bool
*/
private $signalHandling;
public function __construct() {
$this->timerQueue = new \SplPriorityQueue();
2016-05-25 23:09:53 -05:00
$this->signalHandling = \extension_loaded("pcntl");
2016-05-19 11:12:35 -05:00
}
/**
* {@inheritdoc}
*/
public function run() {
$previous = $this->running++;
2016-05-19 11:12:35 -05:00
try {
while ($this->running > $previous) {
2016-05-19 11:12:35 -05:00
if ($this->isEmpty()) {
return;
}
$this->tick();
}
} finally {
$this->running = $previous;
2016-05-19 11:12:35 -05:00
}
}
/**
* {@inheritdoc}
*/
public function stop() {
--$this->running;
2016-05-19 11:12:35 -05:00
}
/**
* @return bool True if no enabled and referenced watchers remain in the loop.
2016-05-19 11:12:35 -05:00
*/
private function isEmpty() {
foreach ($this->watchers as $watcher) {
if ($watcher->enabled && $watcher->referenced) {
return false;
}
2016-05-19 11:12:35 -05:00
}
return true;
2016-05-19 11:12:35 -05:00
}
/**
* Executes a single tick of the event loop.
*/
private function tick() {
try {
2016-05-23 00:37:56 -05:00
if (!empty($this->deferQueue)) {
$this->invokeDeferred();
}
2016-05-19 11:12:35 -05:00
2016-06-07 12:24:53 -05:00
$this->selectStreams(
$this->readStreams,
$this->writeStreams,
empty($this->enableQueue) ? $this->getTimeout() : 0
);
2016-05-19 11:12:35 -05:00
2016-05-23 00:37:56 -05:00
if (!empty($this->timerExpires)) {
$this->invokeTimers();
}
2016-05-19 11:12:35 -05:00
if ($this->signalHandling) {
\pcntl_signal_dispatch();
2016-05-19 11:12:35 -05:00
}
2016-06-07 12:24:53 -05:00
if (!empty($this->enableQueue)) {
$this->enableWatchers();
}
2016-05-19 11:12:35 -05:00
} catch (\Throwable $exception) {
if (null === $this->errorHandler) {
throw $exception;
}
$errorHandler = $this->errorHandler;
$errorHandler($exception);
2016-05-23 00:37:56 -05:00
} catch (\Exception $exception) { // @todo Remove when PHP 5.x support is no longer needed.
2016-05-19 11:12:35 -05:00
if (null === $this->errorHandler) {
throw $exception;
}
$errorHandler = $this->errorHandler;
$errorHandler($exception);
}
}
/**
* @param resource[] $read
* @param resource[] $write
* @param int $timeout
*/
private function selectStreams(array $read, array $write, $timeout) {
$timeout /= self::MILLISEC_PER_SEC;
if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop.
if ($timeout >= 0) {
2016-05-19 11:12:35 -05:00
$seconds = (int) $timeout;
$microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC);
} else {
$seconds = null;
$microseconds = null;
}
$except = null;
// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
$count = @\stream_select($read, $write, $except, $seconds, $microseconds);
2016-05-19 11:12:35 -05:00
if ($count) {
foreach ($read as $stream) {
$streamId = (int) $stream;
if (isset($this->readWatchers[$streamId])) {
foreach ($this->readWatchers[$streamId] as $watcher) {
2016-06-07 12:24:53 -05:00
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
}
2016-05-19 11:12:35 -05:00
}
}
foreach ($write as $stream) {
$streamId = (int) $stream;
if (isset($this->writeWatchers[$streamId])) {
foreach ($this->writeWatchers[$streamId] as $watcher) {
2016-06-07 12:24:53 -05:00
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
}
2016-05-19 11:12:35 -05:00
}
}
}
return;
}
if ($timeout > 0) { // Otherwise sleep with usleep() if $timeout > 0.
\usleep($timeout * self::MICROSEC_PER_SEC);
2016-05-19 11:12:35 -05:00
}
}
/**
* @return int Milliseconds until next timer expires or -1 if there are no pending times.
*/
private function getTimeout() {
while (!$this->timerQueue->isEmpty()) {
2016-06-07 12:24:53 -05:00
list($watcher, $expiration) = $this->timerQueue->top();
$id = $watcher->id;
2016-05-19 11:12:35 -05:00
2016-06-07 12:24:53 -05:00
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
2016-05-19 11:12:35 -05:00
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}
2016-06-07 12:24:53 -05:00
$expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC);
2016-05-19 11:12:35 -05:00
2016-06-07 12:24:53 -05:00
if ($expiration < 0) {
2016-05-19 11:12:35 -05:00
return 0;
}
2016-06-07 12:24:53 -05:00
return $expiration;
2016-05-19 11:12:35 -05:00
}
return -1;
}
/**
* Invokes all pending defer watchers.
*/
private function invokeDeferred() {
2016-06-07 12:24:53 -05:00
foreach ($this->deferQueue as $watcher) {
$id = $watcher->id;
if (!isset($this->deferQueue[$id])) {
2016-06-07 12:24:53 -05:00
continue; // Watcher disabled by another defer watcher.
}
2016-05-19 11:12:35 -05:00
$watcher = $this->watchers[$id];
unset($this->watchers[$id], $this->deferQueue[$id]);
2016-05-19 11:12:35 -05:00
$callback = $watcher->callback;
$callback($watcher->id, $watcher->data);
2016-05-19 11:12:35 -05:00
}
}
/**
* Invokes all pending delay and repeat watchers.
*/
private function invokeTimers() {
$time = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
2016-05-19 11:12:35 -05:00
while (!$this->timerQueue->isEmpty()) {
2016-06-07 12:24:53 -05:00
list($watcher, $expiration) = $this->timerQueue->top();
$id = $watcher->id;
2016-06-07 12:24:53 -05:00
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
2016-05-19 11:12:35 -05:00
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}
if ($this->timerExpires[$id] > $time) { // Timer at top of queue has not expired.
return;
}
// Remove and execute timer. Replace timer if persistent.
$this->timerQueue->extract();
2016-06-07 12:24:53 -05:00
if ($watcher->type & Watcher::REPEAT) {
$this->enableQueue[$id] = $watcher;
2016-05-19 11:12:35 -05:00
} else {
unset($this->watchers[$id], $this->timerExpires[$id]);
2016-05-19 11:12:35 -05:00
}
// Execute the timer.
2016-05-25 23:09:53 -05:00
$callback = $watcher->callback;
$callback($watcher->id, $watcher->data);
2016-05-19 11:12:35 -05:00
}
}
2016-06-07 12:24:53 -05:00
/**
* Enables any watchers queued to be enabled on the next tick.
*/
private function enableWatchers() {
$enableQueue = $this->enableQueue;
$this->enableQueue = [];
foreach ($enableQueue as $watcher) {
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher;
$this->readStreams[$streamId] = $watcher->value;
break;
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
$this->writeStreams[$streamId] = $watcher->value;
break;
case Watcher::DELAY:
case Watcher::REPEAT:
$priority = (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value;
$expiration = (int) $priority;
$this->timerExpires[$watcher->id] = $expiration;
$this->timerQueue->insert([$watcher, $expiration], -$priority);
break;
case Watcher::DEFER:
$this->deferQueue[$watcher->id] = $watcher;
break;
case Watcher::SIGNAL:
if (!isset($this->signalWatchers[$watcher->value])) {
if (!@\pcntl_signal($watcher->value, function ($signo) {
foreach ($this->signalWatchers[$signo] as $watcher) {
if (!isset($this->watchers[$watcher->id])) {
continue;
}
$callback = $watcher->callback;
$callback($watcher->id, $signo, $watcher->data);
}
})) {
throw new \RuntimeException("Failed to register signal handler");
}
}
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
break;
default: throw new \DomainException("Unknown watcher type");
}
}
}
2016-05-19 11:12:35 -05:00
/**
* {@inheritdoc}
*/
public function defer(callable $callback, $data = null) {
2016-05-25 23:09:53 -05:00
$watcher = new Watcher;
$watcher->type = Watcher::DEFER;
$watcher->id = $this->nextId++;
2016-05-25 23:09:53 -05:00
$watcher->callback = $callback;
$watcher->data = $data;
2016-05-19 11:12:35 -05:00
$this->watchers[$watcher->id] = $watcher;
2016-06-07 12:24:53 -05:00
$this->enableQueue[$watcher->id] = $watcher;
2016-05-19 11:12:35 -05:00
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function delay($delay, callable $callback, $data = null) {
2016-05-25 23:09:53 -05:00
$delay = (int) $delay;
if ($delay < 0) {
2016-05-25 23:09:53 -05:00
throw new \InvalidArgumentException("Delay must be greater than or equal to zero");
}
$watcher = new Watcher;
$watcher->type = Watcher::DELAY;
$watcher->id = $this->nextId++;
2016-05-25 23:09:53 -05:00
$watcher->callback = $callback;
$watcher->value = $delay;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
2016-06-07 12:24:53 -05:00
$this->enableQueue[$watcher->id] = $watcher;
2016-05-25 23:09:53 -05:00
return $watcher->id;
2016-05-19 11:12:35 -05:00
}
/**
* {@inheritdoc}
*/
public function repeat($interval, callable $callback, $data = null) {
2016-05-25 23:09:53 -05:00
$interval = (int) $interval;
if ($interval < 0) {
2016-05-25 23:09:53 -05:00
throw new \InvalidArgumentException("Interval must be greater than or equal to zero");
}
$watcher = new Watcher;
$watcher->type = Watcher::REPEAT;
$watcher->id = $this->nextId++;
2016-05-25 23:09:53 -05:00
$watcher->callback = $callback;
$watcher->value = $interval;
$watcher->data = $data;
2016-05-19 11:12:35 -05:00
$this->watchers[$watcher->id] = $watcher;
2016-06-07 12:24:53 -05:00
$this->enableQueue[$watcher->id] = $watcher;
2016-05-19 11:12:35 -05:00
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function onReadable($stream, callable $callback, $data = null) {
2016-05-25 23:09:53 -05:00
$watcher = new Watcher;
$watcher->type = Watcher::READABLE;
$watcher->id = $this->nextId++;
2016-05-25 23:09:53 -05:00
$watcher->callback = $callback;
$watcher->value = $stream;
$watcher->data = $data;
2016-05-19 11:12:35 -05:00
$this->watchers[$watcher->id] = $watcher;
2016-06-07 12:24:53 -05:00
$this->enableQueue[$watcher->id] = $watcher;
2016-05-19 11:12:35 -05:00
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function onWritable($stream, callable $callback, $data = null) {
2016-05-25 23:09:53 -05:00
$watcher = new Watcher;
$watcher->type = Watcher::WRITABLE;
$watcher->id = $this->nextId++;
2016-05-25 23:09:53 -05:00
$watcher->callback = $callback;
$watcher->value = $stream;
$watcher->data = $data;
2016-05-19 11:12:35 -05:00
$this->watchers[$watcher->id] = $watcher;
2016-06-07 12:24:53 -05:00
$this->enableQueue[$watcher->id] = $watcher;
2016-05-19 11:12:35 -05:00
return $watcher->id;
}
/**
* {@inheritdoc}
*
* @throws \Interop\Async\Loop\UnsupportedFeatureException If the pcntl extension is not available.
* @throws \RuntimeException If creating the backend signal handler fails.
2016-05-19 11:12:35 -05:00
*/
public function onSignal($signo, callable $callback, $data = null) {
if (!$this->signalHandling) {
2016-05-25 23:09:53 -05:00
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
2016-05-19 11:12:35 -05:00
}
2016-05-25 23:09:53 -05:00
$watcher = new Watcher;
$watcher->type = Watcher::SIGNAL;
$watcher->id = $this->nextId++;
2016-05-25 23:09:53 -05:00
$watcher->callback = $callback;
$watcher->value = $signo;
$watcher->data = $data;
2016-05-19 11:12:35 -05:00
$this->watchers[$watcher->id] = $watcher;
2016-06-07 12:24:53 -05:00
$this->enableQueue[$watcher->id] = $watcher;
2016-05-19 11:12:35 -05:00
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function setErrorHandler(callable $callback = null) {
$this->errorHandler = $callback;
}
/**
* {@inheritdoc}
*/
public function enable($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
throw new InvalidWatcherException("Cannot enable an invalid watcher identifier");
2016-05-19 11:12:35 -05:00
}
$watcher = $this->watchers[$watcherIdentifier];
if ($watcher->enabled) {
return; // Watcher already enabled.
}
$watcher->enabled = true;
2016-06-07 12:24:53 -05:00
$this->enableQueue[$watcher->id] = $watcher;
2016-05-19 11:12:35 -05:00
}
/**
* {@inheritdoc}
*/
public function disable($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
throw new InvalidWatcherException("Cannot disable an invalid watcher identifier");
2016-05-19 11:12:35 -05:00
}
$watcher = $this->watchers[$watcherIdentifier];
if (!$watcher->enabled) {
return; // Watcher already disabled.
}
2016-06-07 12:24:53 -05:00
$watcher->enabled = false;
if (isset($this->enableQueue[$watcher->id])) {
unset($this->enableQueue[$watcher->id]);
return; // Watcher was only queued to be enabled.
}
2016-05-25 23:09:53 -05:00
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
unset($this->readWatchers[$streamId][$watcher->id]);
if (empty($this->readWatchers[$streamId])) {
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
}
2016-05-25 23:09:53 -05:00
break;
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
unset($this->writeWatchers[$streamId][$watcher->id]);
if (empty($this->writeWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
}
2016-05-25 23:09:53 -05:00
break;
case Watcher::DELAY:
case Watcher::REPEAT:
unset($this->timerExpires[$watcher->id]);
break;
case Watcher::DEFER:
unset($this->deferQueue[$watcher->id]);
2016-05-25 23:09:53 -05:00
break;
case Watcher::SIGNAL:
2016-06-07 12:24:53 -05:00
if (isset($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value][$watcher->id]);
if (empty($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value]);
@\pcntl_signal($watcher->value, \SIG_DFL);
}
}
2016-05-25 23:09:53 -05:00
break;
2016-06-07 12:24:53 -05:00
default: throw new \DomainException("Unknown watcher type");
2016-05-19 11:12:35 -05:00
}
}
/**
* {@inheritdoc}
*/
public function cancel($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
return; // Avoid throwing from disable() if the watcher is invalid.
}
2016-05-19 11:12:35 -05:00
$this->disable($watcherIdentifier);
unset($this->watchers[$watcherIdentifier]);
2016-05-19 11:12:35 -05:00
}
/**
* {@inheritdoc}
*/
public function reference($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
throw new InvalidWatcherException("Cannot reference an invalid watcher identifier");
}
$this->watchers[$watcherIdentifier]->referenced = true;
2016-05-19 11:12:35 -05:00
}
/**
* {@inheritdoc}
*/
public function unreference($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
throw new InvalidWatcherException("Cannot unreference an invalid watcher identifier");
2016-05-19 11:12:35 -05:00
}
$this->watchers[$watcherIdentifier]->referenced = false;
2016-05-19 11:12:35 -05:00
}
/**
* {@inheritdoc}
*/
public function info() {
$watchers = [
"referenced" => 0,
"unreferenced" => 0,
];
2016-05-19 11:12:35 -05:00
$defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [
2016-05-25 23:09:53 -05:00
"enabled" => 0,
"disabled" => 0,
2016-05-19 11:12:35 -05:00
];
foreach ($this->watchers as $watcher) {
2016-05-25 23:09:53 -05:00
switch ($watcher->type) {
case Watcher::READABLE: $array = &$onReadable; break;
case Watcher::WRITABLE: $array = &$onWritable; break;
case Watcher::SIGNAL: $array = &$onSignal; break;
case Watcher::DEFER: $array = &$defer; break;
case Watcher::DELAY: $array = &$delay; break;
case Watcher::REPEAT: $array = &$repeat; break;
2016-06-07 12:24:53 -05:00
default: throw new \DomainException("Unknown watcher type");
}
2016-05-25 23:09:53 -05:00
if ($watcher->enabled) {
++$array["enabled"];
2016-05-25 23:09:53 -05:00
if ($watcher->referenced) {
++$watchers["referenced"];
} else {
++$watchers["unreferenced"];
}
} else {
++$array["disabled"];
2016-05-19 11:12:35 -05:00
}
}
return [
"watchers" => $watchers,
2016-05-25 23:09:53 -05:00
"defer" => $defer,
"delay" => $delay,
"repeat" => $repeat,
"on_readable" => $onReadable,
"on_writable" => $onWritable,
"on_signal" => $onSignal,
"running" => $this->running,
2016-05-19 11:12:35 -05:00
];
}
/**
* {@inheritdoc}
*/
public function getHandle() {
2016-05-19 11:12:35 -05:00
return null;
}
2016-05-25 23:09:53 -05:00
/**
* Returns the same array of data as info().
*
* @return array
*/
public function __debugInfo() {
return $this->info();
}
}