1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 13:21:16 +01:00
amp/lib/Loop.php
2017-02-16 22:36:32 -06:00

416 lines
11 KiB
PHP

<?php
namespace Amp\Loop;
use Amp\Loop\Internal\Watcher;
use AsyncInterop\Loop\Driver;
use AsyncInterop\Loop\InvalidWatcherException;
abstract class Loop extends Driver {
// Don't use 1e3 / 1e6, they result in a float instead of int
const MILLISEC_PER_SEC = 1000;
const MICROSEC_PER_SEC = 1000000;
/** @var string */
private $nextId = "a";
/** @var \Amp\Loop\Internal\Watcher[] */
private $watchers = [];
/** @var \Amp\Loop\Internal\Watcher[] */
private $enableQueue = [];
/** @var \Amp\Loop\Internal\Watcher[] */
private $deferQueue = [];
/** @var \Amp\Loop\Internal\Watcher[] */
private $nextTickQueue = [];
/** @var callable|null */
private $errorHandler;
/** @var int */
private $running = 0;
/**
* {@inheritdoc}
*/
public function run() {
$previous = $this->running;
++$this->running;
try {
while ($this->running > $previous) {
if ($this->isEmpty()) {
return;
}
$this->tick();
}
} finally {
$this->running = $previous;
}
}
/**
* {@inheritdoc}
*/
public function stop() {
--$this->running > 0 ?: $this->running = 0;
}
/**
* @return bool True if no enabled and referenced watchers remain in the loop.
*/
private function isEmpty() {
foreach ($this->watchers as $watcher) {
if ($watcher->enabled && $watcher->referenced) {
return false;
}
}
return true;
}
/**
* Executes a single tick of the event loop.
*/
private function tick() {
$this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue);
$this->nextTickQueue = [];
$this->activate($this->enableQueue);
$this->enableQueue = [];
try {
foreach ($this->deferQueue as $watcher) {
if (!isset($this->deferQueue[$watcher->id])) {
continue; // Watcher disabled by another defer watcher.
}
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
$callback = $watcher->callback;
$callback($watcher->id, $watcher->data);
}
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running);
} catch (\Throwable $exception) {
if (null === $this->errorHandler) {
throw $exception;
}
$errorHandler = $this->errorHandler;
$errorHandler($exception);
} catch (\Exception $exception) { // @todo Remove when PHP 5.x support is no longer needed.
if (null === $this->errorHandler) {
throw $exception;
}
$errorHandler = $this->errorHandler;
$errorHandler($exception);
}
}
/**
* Dispatches any pending read/write, timer, and signal events.
*
* @param bool $blocking
*/
abstract protected function dispatch($blocking);
/**
* Activates (enables) all the given watchers.
*
* @param \Amp\Loop\Internal\Watcher[] $watchers
*/
abstract protected function activate(array $watchers);
/**
* Deactivates (disables) the given watcher.
*
* @param \Amp\Loop\Internal\Watcher $watcher
*/
abstract protected function deactivate(Watcher $watcher);
/**
* {@inheritdoc}
*/
public function defer(callable $callback, $data = null) {
$watcher = new Watcher;
$watcher->type = Watcher::DEFER;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->nextTickQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function delay($delay, callable $callback, $data = null) {
$delay = (int) $delay;
if ($delay < 0) {
throw new \InvalidArgumentException("Delay must be greater than or equal to zero");
}
$watcher = new Watcher;
$watcher->type = Watcher::DELAY;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $delay;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function repeat($interval, callable $callback, $data = null) {
$interval = (int) $interval;
if ($interval < 0) {
throw new \InvalidArgumentException("Interval must be greater than or equal to zero");
}
$watcher = new Watcher;
$watcher->type = Watcher::REPEAT;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $interval;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function onReadable($stream, callable $callback, $data = null) {
$watcher = new Watcher;
$watcher->type = Watcher::READABLE;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $stream;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function onWritable($stream, callable $callback, $data = null) {
$watcher = new Watcher;
$watcher->type = Watcher::WRITABLE;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $stream;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* {@inheritdoc}
*
* @throws \AsyncInterop\Loop\UnsupportedFeatureException If the pcntl extension is not available.
* @throws \RuntimeException If creating the backend signal handler fails.
*/
public function onSignal($signo, callable $callback, $data = null) {
$watcher = new Watcher;
$watcher->type = Watcher::SIGNAL;
$watcher->id = $this->nextId++;
$watcher->callback = $callback;
$watcher->value = $signo;
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$this->enableQueue[$watcher->id] = $watcher;
return $watcher->id;
}
/**
* {@inheritdoc}
*/
public function enable($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
throw new InvalidWatcherException($watcherIdentifier, "Cannot enable an invalid watcher identifier: '{$watcherIdentifier}'");
}
$watcher = $this->watchers[$watcherIdentifier];
if ($watcher->enabled) {
return; // Watcher already enabled.
}
$watcher->enabled = true;
switch ($watcher->type) {
case Watcher::DEFER:
$this->nextTickQueue[$watcher->id] = $watcher;
break;
default:
$this->enableQueue[$watcher->id] = $watcher;
break;
}
}
/**
* {@inheritdoc}
*/
public function disable($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
return;
}
$watcher = $this->watchers[$watcherIdentifier];
if (!$watcher->enabled) {
return; // Watcher already disabled.
}
$watcher->enabled = false;
$id = $watcher->id;
switch ($watcher->type) {
case Watcher::DEFER:
if (isset($this->nextTickQueue[$id])) {
// Watcher was only queued to be enabled.
unset($this->nextTickQueue[$id]);
} else {
unset($this->deferQueue[$id]);
}
break;
default:
if (isset($this->enableQueue[$id])) {
// Watcher was only queued to be enabled.
unset($this->enableQueue[$id]);
} else {
$this->deactivate($watcher);
}
break;
}
}
/**
* {@inheritdoc}
*/
public function cancel($watcherIdentifier) {
$this->disable($watcherIdentifier);
unset($this->watchers[$watcherIdentifier]);
}
/**
* {@inheritdoc}
*/
public function reference($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
throw new InvalidWatcherException($watcherIdentifier, "Cannot reference an invalid watcher identifier: '{$watcherIdentifier}'");
}
$this->watchers[$watcherIdentifier]->referenced = true;
}
/**
* {@inheritdoc}
*/
public function unreference($watcherIdentifier) {
if (!isset($this->watchers[$watcherIdentifier])) {
throw new InvalidWatcherException($watcherIdentifier, "Cannot unreference an invalid watcher identifier: '{$watcherIdentifier}'");
}
$this->watchers[$watcherIdentifier]->referenced = false;
}
/**
* {@inheritdoc}
*/
public function setErrorHandler(callable $callback = null) {
$previous = $this->errorHandler;
$this->errorHandler = $callback;
return $previous;
}
/**
* {@inheritdoc}
*/
public function getInfo() {
$watchers = [
"referenced" => 0,
"unreferenced" => 0,
];
$defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [
"enabled" => 0,
"disabled" => 0,
];
foreach ($this->watchers as $watcher) {
switch ($watcher->type) {
case Watcher::READABLE: $array = &$onReadable; break;
case Watcher::WRITABLE: $array = &$onWritable; break;
case Watcher::SIGNAL: $array = &$onSignal; break;
case Watcher::DEFER: $array = &$defer; break;
case Watcher::DELAY: $array = &$delay; break;
case Watcher::REPEAT: $array = &$repeat; break;
default: throw new \DomainException("Unknown watcher type");
}
if ($watcher->enabled) {
++$array["enabled"];
if ($watcher->referenced) {
++$watchers["referenced"];
} else {
++$watchers["unreferenced"];
}
} else {
++$array["disabled"];
}
}
return [
"watchers" => $watchers,
"defer" => $defer,
"delay" => $delay,
"repeat" => $repeat,
"on_readable" => $onReadable,
"on_writable" => $onWritable,
"on_signal" => $onSignal,
"running" => (bool) $this->running,
];
}
/**
* Returns the same array of data as getInfo().
*
* @return array
*/
public function __debugInfo() {
return $this->getInfo();
}
}