mirror of
https://github.com/danog/amp.git
synced 2025-01-22 05:11:42 +01:00
Merge amphp/amp repository into amphp/amp
This commit is contained in:
commit
f4f9008ca0
34
.travis.yml
34
.travis.yml
@ -17,6 +17,40 @@ env:
|
||||
- DEPS=highest
|
||||
|
||||
install:
|
||||
- git clone https://github.com/libuv/libuv;
|
||||
pushd libuv;
|
||||
git checkout $(git describe --tags);
|
||||
./autogen.sh;
|
||||
./configure --prefix=$(dirname `pwd`)/libuv-install;
|
||||
make;
|
||||
make install;
|
||||
popd;
|
||||
git clone https://github.com/bwoebi/php-uv.git;
|
||||
pushd php-uv;
|
||||
phpize;
|
||||
./configure --with-uv=$(dirname `pwd`)/libuv-install;
|
||||
make;
|
||||
make install;
|
||||
popd;
|
||||
echo "extension=uv.so" >> "$(php -r 'echo php_ini_loaded_file();')";
|
||||
|
||||
- curl -LS https://pecl.php.net/get/ev | tar -xz;
|
||||
pushd ev-*;
|
||||
phpize;
|
||||
./configure;
|
||||
make;
|
||||
make install;
|
||||
popd;
|
||||
echo "extension=ev.so" >> "$(php -r 'echo php_ini_loaded_file();')";
|
||||
- curl -LS https://pecl.php.net/get/event | tar -xz;
|
||||
pushd event-*;
|
||||
phpize;
|
||||
./configure --with-event-core --with-event-extra --with-event-pthreads;
|
||||
make;
|
||||
make install;
|
||||
popd;
|
||||
echo "extension=event.so" >> "$(php -r 'echo php_ini_loaded_file();')";
|
||||
|
||||
- if [ "$DEPS" = "lowest" ]; then
|
||||
composer update -n --prefer-source --prefer-lowest;
|
||||
else
|
||||
|
190
lib/EvLoop.php
Normal file
190
lib/EvLoop.php
Normal file
@ -0,0 +1,190 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
use Amp\Loop\Internal\Watcher;
|
||||
|
||||
class EvLoop extends Loop {
|
||||
/** @var \EvLoop */
|
||||
private $handle;
|
||||
|
||||
/** @var \EvWatcher[] */
|
||||
private $events = [];
|
||||
|
||||
/** @var callable */
|
||||
private $ioCallback;
|
||||
|
||||
/** @var callable */
|
||||
private $timerCallback;
|
||||
|
||||
/** @var callable */
|
||||
private $signalCallback;
|
||||
|
||||
/** @var \EvSignal[] */
|
||||
private $signals = [];
|
||||
|
||||
/** @var \EvSignal[]|null */
|
||||
private static $activeSignals;
|
||||
|
||||
public static function supported() {
|
||||
return \extension_loaded("ev");
|
||||
}
|
||||
|
||||
public function __construct() {
|
||||
$this->handle = new \EvLoop;
|
||||
|
||||
if (self::$activeSignals === null) {
|
||||
self::$activeSignals = &$this->signals;
|
||||
}
|
||||
|
||||
$this->ioCallback = function (\EvIO $event) {
|
||||
/** @var \Amp\Loop\Internal\Watcher $watcher */
|
||||
$watcher = $event->data;
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->value, $watcher->data);
|
||||
};
|
||||
|
||||
$this->timerCallback = function (\EvTimer $event) {
|
||||
/** @var \Amp\Loop\Internal\Watcher $watcher */
|
||||
$watcher = $event->data;
|
||||
|
||||
if ($watcher->type & Watcher::DELAY) {
|
||||
$this->cancel($watcher->id);
|
||||
}
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->data);
|
||||
};
|
||||
|
||||
$this->signalCallback = function (\EvSignal $event) {
|
||||
/** @var \Amp\Loop\Internal\Watcher $watcher */
|
||||
$watcher = $event->data;
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->value, $watcher->data);
|
||||
};
|
||||
}
|
||||
|
||||
public function __destruct() {
|
||||
foreach ($this->events as $event) {
|
||||
$event->stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function run() {
|
||||
$active = self::$activeSignals;
|
||||
|
||||
foreach ($active as $event) {
|
||||
$event->stop();
|
||||
}
|
||||
|
||||
self::$activeSignals = &$this->signals;
|
||||
|
||||
foreach ($this->signals as $event) {
|
||||
$event->start();
|
||||
}
|
||||
|
||||
try {
|
||||
parent::run();
|
||||
} finally {
|
||||
foreach ($this->signals as $event) {
|
||||
$event->stop();
|
||||
}
|
||||
|
||||
self::$activeSignals = &$active;
|
||||
|
||||
foreach ($active as $event) {
|
||||
$event->start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function stop() {
|
||||
$this->handle->stop();
|
||||
parent::stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function dispatch($blocking) {
|
||||
$this->handle->run($blocking ? \Ev::RUN_ONCE : \Ev::RUN_ONCE | \Ev::RUN_NOWAIT);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function activate(array $watchers) {
|
||||
foreach ($watchers as $watcher) {
|
||||
if (!isset($this->events[$id = $watcher->id])) {
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
$this->events[$id] = $this->handle->io($watcher->value, \Ev::READ, $this->ioCallback, $watcher);
|
||||
break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
$this->events[$id] = $this->handle->io($watcher->value, \Ev::WRITE, $this->ioCallback, $watcher);
|
||||
break;
|
||||
|
||||
case Watcher::DELAY:
|
||||
case Watcher::REPEAT:
|
||||
$interval = $watcher->value / self::MILLISEC_PER_SEC;
|
||||
$this->events[$id] = $this->handle->timer(
|
||||
$interval,
|
||||
$watcher->type & Watcher::REPEAT ? $interval : 0,
|
||||
$this->timerCallback,
|
||||
$watcher
|
||||
);
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
$this->events[$id] = $this->handle->signal($watcher->value, $this->signalCallback, $watcher);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new \DomainException("Unknown watcher type");
|
||||
}
|
||||
} else {
|
||||
$this->events[$id]->start();
|
||||
}
|
||||
|
||||
if ($watcher->type === Watcher::SIGNAL) {
|
||||
$this->signals[$id] = $this->events[$id];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function deactivate(Watcher $watcher) {
|
||||
if (isset($this->events[$id = $watcher->id])) {
|
||||
$this->events[$id]->stop();
|
||||
if ($watcher->type === Watcher::SIGNAL) {
|
||||
unset($this->signals[$id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function cancel($watcherIdentifier) {
|
||||
parent::cancel($watcherIdentifier);
|
||||
unset($this->events[$watcherIdentifier]);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getHandle() {
|
||||
return $this->handle;
|
||||
}
|
||||
}
|
213
lib/EventLoop.php
Normal file
213
lib/EventLoop.php
Normal file
@ -0,0 +1,213 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
use Amp\Loop\Internal\Watcher;
|
||||
|
||||
class EventLoop extends Loop {
|
||||
/** @var \EventBase */
|
||||
private $handle;
|
||||
|
||||
/** @var \Event[] */
|
||||
private $events = [];
|
||||
|
||||
/** @var callable */
|
||||
private $ioCallback;
|
||||
|
||||
/** @var callable */
|
||||
private $timerCallback;
|
||||
|
||||
/** @var callable */
|
||||
private $signalCallback;
|
||||
|
||||
/** @var \Event[] */
|
||||
private $signals = [];
|
||||
|
||||
/** @var \Event[]|null */
|
||||
private static $activeSignals;
|
||||
|
||||
public static function supported() {
|
||||
return \extension_loaded("event");
|
||||
}
|
||||
|
||||
public function __construct() {
|
||||
$this->handle = new \EventBase;
|
||||
|
||||
if (self::$activeSignals === null) {
|
||||
self::$activeSignals = &$this->signals;
|
||||
}
|
||||
|
||||
$this->ioCallback = function ($resource, $what, Watcher $watcher) {
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->value, $watcher->data);
|
||||
};
|
||||
|
||||
$this->timerCallback = function ($resource, $what, Watcher $watcher) {
|
||||
if ($watcher->type & Watcher::DELAY) {
|
||||
$this->cancel($watcher->id);
|
||||
}
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->data);
|
||||
};
|
||||
|
||||
$this->signalCallback = function ($signum, $what, Watcher $watcher) {
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->value, $watcher->data);
|
||||
};
|
||||
}
|
||||
|
||||
public function __destruct() {
|
||||
foreach ($this->events as $event) {
|
||||
$event->free();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function run() {
|
||||
$active = self::$activeSignals;
|
||||
|
||||
foreach ($active as $event) {
|
||||
$event->del();
|
||||
}
|
||||
|
||||
self::$activeSignals = &$this->signals;
|
||||
|
||||
foreach ($this->signals as $event) {
|
||||
$event->add();
|
||||
}
|
||||
|
||||
try {
|
||||
parent::run();
|
||||
} finally {
|
||||
foreach ($this->signals as $event) {
|
||||
$event->del();
|
||||
}
|
||||
|
||||
self::$activeSignals = &$active;
|
||||
|
||||
foreach ($active as $event) {
|
||||
$event->add();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function stop() {
|
||||
$this->handle->stop();
|
||||
parent::stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function dispatch($blocking) {
|
||||
$this->handle->loop($blocking ? \EventBase::LOOP_ONCE : \EventBase::LOOP_ONCE | \EventBase::LOOP_NONBLOCK);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function activate(array $watchers) {
|
||||
foreach ($watchers as $watcher) {
|
||||
if (!isset($this->events[$id = $watcher->id])) {
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
$this->events[$id] = new \Event(
|
||||
$this->handle,
|
||||
$watcher->value,
|
||||
\Event::READ | \Event::PERSIST,
|
||||
$this->ioCallback,
|
||||
$watcher
|
||||
);
|
||||
break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
$this->events[$id] = new \Event(
|
||||
$this->handle,
|
||||
$watcher->value,
|
||||
\Event::WRITE | \Event::PERSIST,
|
||||
$this->ioCallback,
|
||||
$watcher
|
||||
);
|
||||
break;
|
||||
|
||||
case Watcher::DELAY:
|
||||
case Watcher::REPEAT:
|
||||
$this->events[$id] = new \Event(
|
||||
$this->handle,
|
||||
-1,
|
||||
\Event::TIMEOUT | \Event::PERSIST,
|
||||
$this->timerCallback,
|
||||
$watcher
|
||||
);
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
$this->events[$id] = new \Event(
|
||||
$this->handle,
|
||||
$watcher->value,
|
||||
\Event::SIGNAL | \Event::PERSIST,
|
||||
$this->signalCallback,
|
||||
$watcher
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new \DomainException("Unknown watcher type");
|
||||
}
|
||||
}
|
||||
|
||||
switch ($watcher->type) {
|
||||
case Watcher::DELAY:
|
||||
case Watcher::REPEAT:
|
||||
$this->events[$id]->add($watcher->value / self::MILLISEC_PER_SEC);
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
$this->signals[$id] = $this->events[$id];
|
||||
// No break
|
||||
|
||||
default:
|
||||
$this->events[$id]->add();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function deactivate(Watcher $watcher) {
|
||||
if (isset($this->events[$id = $watcher->id])) {
|
||||
$this->events[$id]->del();
|
||||
|
||||
if ($watcher->type === Watcher::SIGNAL) {
|
||||
unset($this->signals[$id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function cancel($watcherIdentifier) {
|
||||
parent::cancel($watcherIdentifier);
|
||||
|
||||
if (isset($this->events[$watcherIdentifier])) {
|
||||
$this->events[$watcherIdentifier]->free();
|
||||
unset($this->events[$watcherIdentifier]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getHandle() {
|
||||
return $this->handle;
|
||||
}
|
||||
}
|
43
lib/Internal/Watcher.php
Normal file
43
lib/Internal/Watcher.php
Normal file
@ -0,0 +1,43 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop\Internal;
|
||||
|
||||
class Watcher {
|
||||
const DEFER = 0b00000001;
|
||||
const TIMER = 0b00000110;
|
||||
const DELAY = 0b00000010;
|
||||
const REPEAT = 0b00000100;
|
||||
const IO = 0b00011000;
|
||||
const READABLE = 0b00001000;
|
||||
const WRITABLE = 0b00010000;
|
||||
const SIGNAL = 0b00100000;
|
||||
|
||||
/** @var int */
|
||||
public $type;
|
||||
|
||||
/** @var bool */
|
||||
public $enabled = true;
|
||||
|
||||
/** @var bool */
|
||||
public $referenced = true;
|
||||
|
||||
/** @var string */
|
||||
public $id;
|
||||
|
||||
/** @var callable */
|
||||
public $callback;
|
||||
|
||||
/**
|
||||
* Data provided to the watcher callback.
|
||||
*
|
||||
* @var mixed
|
||||
*/
|
||||
public $data;
|
||||
|
||||
/**
|
||||
* Watcher-dependent value storage. Stream for IO watchers, signo for signal watchers, interval for timers.
|
||||
*
|
||||
* @var mixed
|
||||
*/
|
||||
public $value;
|
||||
}
|
415
lib/Loop.php
Normal file
415
lib/Loop.php
Normal file
@ -0,0 +1,415 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
use Amp\Loop\Internal\Watcher;
|
||||
use AsyncInterop\Loop\Driver;
|
||||
use AsyncInterop\Loop\InvalidWatcherException;
|
||||
|
||||
abstract class Loop extends Driver {
|
||||
// Don't use 1e3 / 1e6, they result in a float instead of int
|
||||
const MILLISEC_PER_SEC = 1000;
|
||||
const MICROSEC_PER_SEC = 1000000;
|
||||
|
||||
/** @var string */
|
||||
private $nextId = "a";
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[] */
|
||||
private $watchers = [];
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[] */
|
||||
private $enableQueue = [];
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[] */
|
||||
private $deferQueue = [];
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[] */
|
||||
private $nextTickQueue = [];
|
||||
|
||||
/** @var callable|null */
|
||||
private $errorHandler;
|
||||
|
||||
/** @var int */
|
||||
private $running = 0;
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function run() {
|
||||
$previous = $this->running;
|
||||
++$this->running;
|
||||
|
||||
try {
|
||||
while ($this->running > $previous) {
|
||||
if ($this->isEmpty()) {
|
||||
return;
|
||||
}
|
||||
$this->tick();
|
||||
}
|
||||
} finally {
|
||||
$this->running = $previous;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function stop() {
|
||||
--$this->running > 0 ?: $this->running = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool True if no enabled and referenced watchers remain in the loop.
|
||||
*/
|
||||
private function isEmpty() {
|
||||
foreach ($this->watchers as $watcher) {
|
||||
if ($watcher->enabled && $watcher->referenced) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single tick of the event loop.
|
||||
*/
|
||||
private function tick() {
|
||||
$this->deferQueue = \array_merge($this->deferQueue, $this->nextTickQueue);
|
||||
$this->nextTickQueue = [];
|
||||
|
||||
$this->activate($this->enableQueue);
|
||||
$this->enableQueue = [];
|
||||
|
||||
try {
|
||||
foreach ($this->deferQueue as $watcher) {
|
||||
if (!isset($this->deferQueue[$watcher->id])) {
|
||||
continue; // Watcher disabled by another defer watcher.
|
||||
}
|
||||
|
||||
unset($this->watchers[$watcher->id], $this->deferQueue[$watcher->id]);
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->data);
|
||||
}
|
||||
|
||||
$this->dispatch(empty($this->nextTickQueue) && empty($this->enableQueue) && $this->running);
|
||||
|
||||
} catch (\Throwable $exception) {
|
||||
if (null === $this->errorHandler) {
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$errorHandler = $this->errorHandler;
|
||||
$errorHandler($exception);
|
||||
} catch (\Exception $exception) { // @todo Remove when PHP 5.x support is no longer needed.
|
||||
if (null === $this->errorHandler) {
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$errorHandler = $this->errorHandler;
|
||||
$errorHandler($exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatches any pending read/write, timer, and signal events.
|
||||
*
|
||||
* @param bool $blocking
|
||||
*/
|
||||
abstract protected function dispatch($blocking);
|
||||
|
||||
/**
|
||||
* Activates (enables) all the given watchers.
|
||||
*
|
||||
* @param \Amp\Loop\Internal\Watcher[] $watchers
|
||||
*/
|
||||
abstract protected function activate(array $watchers);
|
||||
|
||||
/**
|
||||
* Deactivates (disables) the given watcher.
|
||||
*
|
||||
* @param \Amp\Loop\Internal\Watcher $watcher
|
||||
*/
|
||||
abstract protected function deactivate(Watcher $watcher);
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function defer(callable $callback, $data = null) {
|
||||
$watcher = new Watcher;
|
||||
$watcher->type = Watcher::DEFER;
|
||||
$watcher->id = $this->nextId++;
|
||||
$watcher->callback = $callback;
|
||||
$watcher->data = $data;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
$this->nextTickQueue[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function delay($delay, callable $callback, $data = null) {
|
||||
$delay = (int) $delay;
|
||||
|
||||
if ($delay < 0) {
|
||||
throw new \InvalidArgumentException("Delay must be greater than or equal to zero");
|
||||
}
|
||||
|
||||
$watcher = new Watcher;
|
||||
$watcher->type = Watcher::DELAY;
|
||||
$watcher->id = $this->nextId++;
|
||||
$watcher->callback = $callback;
|
||||
$watcher->value = $delay;
|
||||
$watcher->data = $data;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
$this->enableQueue[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function repeat($interval, callable $callback, $data = null) {
|
||||
$interval = (int) $interval;
|
||||
|
||||
if ($interval < 0) {
|
||||
throw new \InvalidArgumentException("Interval must be greater than or equal to zero");
|
||||
}
|
||||
|
||||
$watcher = new Watcher;
|
||||
$watcher->type = Watcher::REPEAT;
|
||||
$watcher->id = $this->nextId++;
|
||||
$watcher->callback = $callback;
|
||||
$watcher->value = $interval;
|
||||
$watcher->data = $data;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
$this->enableQueue[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function onReadable($stream, callable $callback, $data = null) {
|
||||
$watcher = new Watcher;
|
||||
$watcher->type = Watcher::READABLE;
|
||||
$watcher->id = $this->nextId++;
|
||||
$watcher->callback = $callback;
|
||||
$watcher->value = $stream;
|
||||
$watcher->data = $data;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
$this->enableQueue[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function onWritable($stream, callable $callback, $data = null) {
|
||||
$watcher = new Watcher;
|
||||
$watcher->type = Watcher::WRITABLE;
|
||||
$watcher->id = $this->nextId++;
|
||||
$watcher->callback = $callback;
|
||||
$watcher->value = $stream;
|
||||
$watcher->data = $data;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
$this->enableQueue[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*
|
||||
* @throws \AsyncInterop\Loop\UnsupportedFeatureException If the pcntl extension is not available.
|
||||
* @throws \RuntimeException If creating the backend signal handler fails.
|
||||
*/
|
||||
public function onSignal($signo, callable $callback, $data = null) {
|
||||
$watcher = new Watcher;
|
||||
$watcher->type = Watcher::SIGNAL;
|
||||
$watcher->id = $this->nextId++;
|
||||
$watcher->callback = $callback;
|
||||
$watcher->value = $signo;
|
||||
$watcher->data = $data;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
$this->enableQueue[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function enable($watcherIdentifier) {
|
||||
if (!isset($this->watchers[$watcherIdentifier])) {
|
||||
throw new InvalidWatcherException($watcherIdentifier, "Cannot enable an invalid watcher identifier: '{$watcherIdentifier}'");
|
||||
}
|
||||
|
||||
$watcher = $this->watchers[$watcherIdentifier];
|
||||
|
||||
if ($watcher->enabled) {
|
||||
return; // Watcher already enabled.
|
||||
}
|
||||
|
||||
$watcher->enabled = true;
|
||||
|
||||
switch ($watcher->type) {
|
||||
case Watcher::DEFER:
|
||||
$this->nextTickQueue[$watcher->id] = $watcher;
|
||||
break;
|
||||
|
||||
default:
|
||||
$this->enableQueue[$watcher->id] = $watcher;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function disable($watcherIdentifier) {
|
||||
if (!isset($this->watchers[$watcherIdentifier])) {
|
||||
return;
|
||||
}
|
||||
|
||||
$watcher = $this->watchers[$watcherIdentifier];
|
||||
|
||||
if (!$watcher->enabled) {
|
||||
return; // Watcher already disabled.
|
||||
}
|
||||
|
||||
$watcher->enabled = false;
|
||||
$id = $watcher->id;
|
||||
|
||||
switch ($watcher->type) {
|
||||
case Watcher::DEFER:
|
||||
if (isset($this->nextTickQueue[$id])) {
|
||||
// Watcher was only queued to be enabled.
|
||||
unset($this->nextTickQueue[$id]);
|
||||
} else {
|
||||
unset($this->deferQueue[$id]);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
if (isset($this->enableQueue[$id])) {
|
||||
// Watcher was only queued to be enabled.
|
||||
unset($this->enableQueue[$id]);
|
||||
} else {
|
||||
$this->deactivate($watcher);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function cancel($watcherIdentifier) {
|
||||
$this->disable($watcherIdentifier);
|
||||
unset($this->watchers[$watcherIdentifier]);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function reference($watcherIdentifier) {
|
||||
if (!isset($this->watchers[$watcherIdentifier])) {
|
||||
throw new InvalidWatcherException($watcherIdentifier, "Cannot reference an invalid watcher identifier: '{$watcherIdentifier}'");
|
||||
}
|
||||
|
||||
$this->watchers[$watcherIdentifier]->referenced = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function unreference($watcherIdentifier) {
|
||||
if (!isset($this->watchers[$watcherIdentifier])) {
|
||||
throw new InvalidWatcherException($watcherIdentifier, "Cannot unreference an invalid watcher identifier: '{$watcherIdentifier}'");
|
||||
}
|
||||
|
||||
$this->watchers[$watcherIdentifier]->referenced = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function setErrorHandler(callable $callback = null) {
|
||||
$previous = $this->errorHandler;
|
||||
$this->errorHandler = $callback;
|
||||
return $previous;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getInfo() {
|
||||
$watchers = [
|
||||
"referenced" => 0,
|
||||
"unreferenced" => 0,
|
||||
];
|
||||
|
||||
$defer = $delay = $repeat = $onReadable = $onWritable = $onSignal = [
|
||||
"enabled" => 0,
|
||||
"disabled" => 0,
|
||||
];
|
||||
|
||||
foreach ($this->watchers as $watcher) {
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE: $array = &$onReadable; break;
|
||||
case Watcher::WRITABLE: $array = &$onWritable; break;
|
||||
case Watcher::SIGNAL: $array = &$onSignal; break;
|
||||
case Watcher::DEFER: $array = &$defer; break;
|
||||
case Watcher::DELAY: $array = &$delay; break;
|
||||
case Watcher::REPEAT: $array = &$repeat; break;
|
||||
|
||||
default: throw new \DomainException("Unknown watcher type");
|
||||
}
|
||||
|
||||
if ($watcher->enabled) {
|
||||
++$array["enabled"];
|
||||
|
||||
if ($watcher->referenced) {
|
||||
++$watchers["referenced"];
|
||||
} else {
|
||||
++$watchers["unreferenced"];
|
||||
}
|
||||
} else {
|
||||
++$array["disabled"];
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
"watchers" => $watchers,
|
||||
"defer" => $defer,
|
||||
"delay" => $delay,
|
||||
"repeat" => $repeat,
|
||||
"on_readable" => $onReadable,
|
||||
"on_writable" => $onWritable,
|
||||
"on_signal" => $onSignal,
|
||||
"running" => (bool) $this->running,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the same array of data as getInfo().
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function __debugInfo() {
|
||||
return $this->getInfo();
|
||||
}
|
||||
}
|
29
lib/LoopFactory.php
Normal file
29
lib/LoopFactory.php
Normal file
@ -0,0 +1,29 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
use AsyncInterop\Loop\DriverFactory;
|
||||
|
||||
/**
|
||||
* Default loop factory for Amp.
|
||||
*/
|
||||
class LoopFactory implements DriverFactory {
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function create() {
|
||||
if (UvLoop::supported()) {
|
||||
return new UvLoop;
|
||||
}
|
||||
|
||||
if (EvLoop::supported()) {
|
||||
return new EvLoop;
|
||||
}
|
||||
|
||||
if (EventLoop::supported()) {
|
||||
return new EventLoop;
|
||||
}
|
||||
|
||||
return new NativeLoop;
|
||||
}
|
||||
}
|
283
lib/NativeLoop.php
Normal file
283
lib/NativeLoop.php
Normal file
@ -0,0 +1,283 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
use Amp\Loop\Internal\Watcher;
|
||||
use AsyncInterop\Loop\UnsupportedFeatureException;
|
||||
|
||||
class NativeLoop extends Loop {
|
||||
/** @var resource[] */
|
||||
private $readStreams = [];
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[][] */
|
||||
private $readWatchers = [];
|
||||
|
||||
/** @var resource[] */
|
||||
private $writeStreams = [];
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[][] */
|
||||
private $writeWatchers = [];
|
||||
|
||||
/** @var int[] */
|
||||
private $timerExpires = [];
|
||||
|
||||
/** @var \SplPriorityQueue */
|
||||
private $timerQueue;
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[][] */
|
||||
private $signalWatchers = [];
|
||||
|
||||
/** @var bool */
|
||||
private $signalHandling;
|
||||
|
||||
public function __construct() {
|
||||
$this->timerQueue = new \SplPriorityQueue();
|
||||
$this->signalHandling = \extension_loaded("pcntl");
|
||||
}
|
||||
|
||||
protected function dispatch($blocking) {
|
||||
$this->selectStreams(
|
||||
$this->readStreams,
|
||||
$this->writeStreams,
|
||||
$blocking ? $this->getTimeout() : 0
|
||||
);
|
||||
|
||||
if (!empty($this->timerExpires)) {
|
||||
$time = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
|
||||
|
||||
while (!$this->timerQueue->isEmpty()) {
|
||||
list($watcher, $expiration) = $this->timerQueue->top();
|
||||
|
||||
$id = $watcher->id;
|
||||
|
||||
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
|
||||
$this->timerQueue->extract(); // Timer was removed from queue.
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($this->timerExpires[$id] > $time) { // Timer at top of queue has not expired.
|
||||
break;
|
||||
}
|
||||
|
||||
$this->timerQueue->extract();
|
||||
|
||||
if ($watcher->type & Watcher::REPEAT) {
|
||||
$this->activate([$watcher]);
|
||||
} else {
|
||||
$this->cancel($id);
|
||||
}
|
||||
|
||||
// Execute the timer.
|
||||
$callback = $watcher->callback;
|
||||
$callback($id, $watcher->data);
|
||||
}
|
||||
}
|
||||
|
||||
if ($this->signalHandling) {
|
||||
\pcntl_signal_dispatch();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param resource[] $read
|
||||
* @param resource[] $write
|
||||
* @param int $timeout
|
||||
*/
|
||||
private function selectStreams(array $read, array $write, $timeout) {
|
||||
$timeout /= self::MILLISEC_PER_SEC;
|
||||
|
||||
if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop.
|
||||
if ($timeout >= 0) {
|
||||
$seconds = (int) $timeout;
|
||||
$microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC);
|
||||
} else {
|
||||
$seconds = null;
|
||||
$microseconds = null;
|
||||
}
|
||||
|
||||
$except = null;
|
||||
|
||||
// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
|
||||
$count = @\stream_select($read, $write, $except, $seconds, $microseconds);
|
||||
|
||||
if ($count) {
|
||||
foreach ($read as $stream) {
|
||||
$streamId = (int) $stream;
|
||||
if (isset($this->readWatchers[$streamId])) {
|
||||
foreach ($this->readWatchers[$streamId] as $watcher) {
|
||||
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
|
||||
continue; // Watcher disabled by another IO watcher.
|
||||
}
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $stream, $watcher->data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach ($write as $stream) {
|
||||
$streamId = (int) $stream;
|
||||
if (isset($this->writeWatchers[$streamId])) {
|
||||
foreach ($this->writeWatchers[$streamId] as $watcher) {
|
||||
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
|
||||
continue; // Watcher disabled by another IO watcher.
|
||||
}
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $stream, $watcher->data);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if ($timeout > 0) { // Otherwise sleep with usleep() if $timeout > 0.
|
||||
\usleep($timeout * self::MICROSEC_PER_SEC);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int Milliseconds until next timer expires or -1 if there are no pending times.
|
||||
*/
|
||||
private function getTimeout() {
|
||||
while (!$this->timerQueue->isEmpty()) {
|
||||
list($watcher, $expiration) = $this->timerQueue->top();
|
||||
|
||||
$id = $watcher->id;
|
||||
|
||||
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
|
||||
$this->timerQueue->extract(); // Timer was removed from queue.
|
||||
continue;
|
||||
}
|
||||
|
||||
$expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC);
|
||||
|
||||
if ($expiration < 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return $expiration;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*
|
||||
* @throws \AsyncInterop\Loop\UnsupportedFeatureException If the pcntl extension is not available.
|
||||
* @throws \RuntimeException If creating the backend signal handler fails.
|
||||
*/
|
||||
public function onSignal($signo, callable $callback, $data = null) {
|
||||
if (!$this->signalHandling) {
|
||||
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
|
||||
}
|
||||
|
||||
return parent::onSignal($signo, $callback, $data);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function activate(array $watchers) {
|
||||
foreach ($watchers as $watcher) {
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
$this->readWatchers[$streamId][$watcher->id] = $watcher;
|
||||
$this->readStreams[$streamId] = $watcher->value;
|
||||
break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
|
||||
$this->writeStreams[$streamId] = $watcher->value;
|
||||
break;
|
||||
|
||||
case Watcher::DELAY:
|
||||
case Watcher::REPEAT:
|
||||
$expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value;
|
||||
$this->timerExpires[$watcher->id] = $expiration;
|
||||
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
if (!isset($this->signalWatchers[$watcher->value])) {
|
||||
if (!@\pcntl_signal($watcher->value, [$this, 'handleSignal'])) {
|
||||
throw new \RuntimeException("Failed to register signal handler");
|
||||
}
|
||||
}
|
||||
|
||||
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new \DomainException("Unknown watcher type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function deactivate(Watcher $watcher) {
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
unset($this->readWatchers[$streamId][$watcher->id]);
|
||||
if (empty($this->readWatchers[$streamId])) {
|
||||
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
|
||||
}
|
||||
break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
unset($this->writeWatchers[$streamId][$watcher->id]);
|
||||
if (empty($this->writeWatchers[$streamId])) {
|
||||
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
|
||||
}
|
||||
break;
|
||||
|
||||
case Watcher::DELAY:
|
||||
case Watcher::REPEAT:
|
||||
unset($this->timerExpires[$watcher->id]);
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
if (isset($this->signalWatchers[$watcher->value])) {
|
||||
unset($this->signalWatchers[$watcher->value][$watcher->id]);
|
||||
|
||||
if (empty($this->signalWatchers[$watcher->value])) {
|
||||
unset($this->signalWatchers[$watcher->value]);
|
||||
@\pcntl_signal($watcher->value, \SIG_DFL);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default: throw new \DomainException("Unknown watcher type");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $signo
|
||||
*/
|
||||
private function handleSignal($signo) {
|
||||
foreach ($this->signalWatchers[$signo] as $watcher) {
|
||||
if (!isset($this->signalWatchers[$signo][$watcher->id])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $signo, $watcher->data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getHandle() {
|
||||
return null;
|
||||
}
|
||||
}
|
255
lib/UvLoop.php
Normal file
255
lib/UvLoop.php
Normal file
@ -0,0 +1,255 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Loop;
|
||||
|
||||
use Amp\Loop\Internal\Watcher;
|
||||
|
||||
class UvLoop extends Loop {
|
||||
/** @var resource A uv_loop resource created with uv_loop_new() */
|
||||
private $handle;
|
||||
|
||||
/** @var resource[] */
|
||||
private $events = [];
|
||||
|
||||
/** @var \Amp\Loop\Internal\Watcher[]|\Amp\Loop\Internal\Watcher[][] */
|
||||
private $watchers = [];
|
||||
|
||||
/** @var resource[] */
|
||||
private $read = [];
|
||||
|
||||
/** @var resource[] */
|
||||
private $write = [];
|
||||
|
||||
/** @var callable */
|
||||
private $ioCallback;
|
||||
|
||||
/** @var callable */
|
||||
private $timerCallback;
|
||||
|
||||
/** @var callable */
|
||||
private $signalCallback;
|
||||
|
||||
public static function supported() {
|
||||
return \extension_loaded("uv");
|
||||
}
|
||||
|
||||
public function __construct() {
|
||||
$this->handle = \uv_loop_new();
|
||||
|
||||
$this->ioCallback = function ($event, $status, $events, $resource) {
|
||||
switch ($status) {
|
||||
case 0: // OK
|
||||
break;
|
||||
|
||||
// If $status is a severe error, stop the poll and throw an exception.
|
||||
case \UV::EACCES:
|
||||
case \UV::EBADF:
|
||||
case \UV::EINVAL:
|
||||
case \UV::ENOTSOCK:
|
||||
throw new \RuntimeException(
|
||||
\sprintf("UV_%s: %s", \uv_err_name($status), \ucfirst(\uv_strerror($status)))
|
||||
);
|
||||
|
||||
default: // Ignore other (probably) trivial warnings and continuing polling.
|
||||
return;
|
||||
}
|
||||
|
||||
$watchers = $this->watchers[(int) $event];
|
||||
|
||||
foreach ($watchers as $watcher) {
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $resource, $watcher->data);
|
||||
}
|
||||
};
|
||||
|
||||
$this->timerCallback = function ($event) {
|
||||
$watcher = $this->watchers[(int) $event];
|
||||
|
||||
if ($watcher->type & Watcher::DELAY) {
|
||||
$this->cancel($watcher->id);
|
||||
}
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $watcher->data);
|
||||
};
|
||||
|
||||
$this->signalCallback = function ($event, $signo) {
|
||||
$watcher = $this->watchers[(int) $event];
|
||||
|
||||
$callback = $watcher->callback;
|
||||
$callback($watcher->id, $signo, $watcher->data);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function dispatch($blocking) {
|
||||
\uv_run($this->handle, $blocking ? \UV::RUN_ONCE : \UV::RUN_NOWAIT);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function activate(array $watchers) {
|
||||
foreach ($watchers as $watcher) {
|
||||
$id = $watcher->id;
|
||||
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
|
||||
if (isset($this->read[$streamId])) {
|
||||
$event = $this->read[$streamId];
|
||||
} elseif (isset($this->events[$id])) {
|
||||
$event = $this->read[$streamId] = $this->events[$id];
|
||||
} else {
|
||||
$event = $this->read[$streamId] = \uv_poll_init_socket($this->handle, $watcher->value);
|
||||
}
|
||||
|
||||
$this->events[$id] = $event;
|
||||
$this->watchers[(int) $event][$id] = $watcher;
|
||||
|
||||
if (!\uv_is_active($event)) {
|
||||
\uv_poll_start($event, \UV::READABLE, $this->ioCallback);
|
||||
}
|
||||
break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
$streamId = (int) $watcher->value;
|
||||
|
||||
if (isset($this->write[$streamId])) {
|
||||
$event = $this->write[$streamId];
|
||||
} elseif (isset($this->events[$id])) {
|
||||
$event = $this->write[$streamId] = $this->events[$id];
|
||||
} else {
|
||||
$event = $this->write[$streamId] = \uv_poll_init_socket($this->handle, $watcher->value);
|
||||
}
|
||||
|
||||
$this->events[$id] = $event;
|
||||
$this->watchers[(int) $event][$id] = $watcher;
|
||||
|
||||
|
||||
if (!\uv_is_active($event)) {
|
||||
\uv_poll_start($event, \UV::WRITABLE, $this->ioCallback);
|
||||
}
|
||||
break;
|
||||
|
||||
case Watcher::DELAY:
|
||||
case Watcher::REPEAT:
|
||||
if (isset($this->events[$id])) {
|
||||
$event = $this->events[$id];
|
||||
} else {
|
||||
$event = $this->events[$id] = \uv_timer_init($this->handle);
|
||||
}
|
||||
|
||||
$this->watchers[(int) $event] = $watcher;
|
||||
|
||||
\uv_timer_start(
|
||||
$event,
|
||||
$watcher->value,
|
||||
$watcher->type & Watcher::REPEAT ? $watcher->value : 0,
|
||||
$this->timerCallback
|
||||
);
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
if (isset($this->events[$id])) {
|
||||
$event = $this->events[$id];
|
||||
} else {
|
||||
$event = $this->events[$id] = \uv_signal_init($this->handle);
|
||||
}
|
||||
|
||||
$this->watchers[(int) $event] = $watcher;
|
||||
|
||||
\uv_signal_start($event, $this->signalCallback, $watcher->value);
|
||||
break;
|
||||
|
||||
default: throw new \DomainException("Unknown watcher type");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function deactivate(Watcher $watcher) {
|
||||
$id = $watcher->id;
|
||||
|
||||
if (!isset($this->events[$id])) {
|
||||
return;
|
||||
}
|
||||
|
||||
$event = $this->events[$id];
|
||||
$eventId = (int) $event;
|
||||
|
||||
switch ($watcher->type) {
|
||||
case Watcher::READABLE:
|
||||
unset($this->watchers[$eventId][$id]);
|
||||
|
||||
if (empty($this->watchers[$eventId])) {
|
||||
unset($this->watchers[$eventId]);
|
||||
unset($this->read[(int) $watcher->value]);
|
||||
if (\uv_is_active($event)) {
|
||||
\uv_poll_stop($event);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case Watcher::WRITABLE:
|
||||
unset($this->watchers[$eventId][$id]);
|
||||
|
||||
if (empty($this->watchers[$eventId])) {
|
||||
unset($this->watchers[$eventId]);
|
||||
unset($this->write[(int) $watcher->value]);
|
||||
if (\uv_is_active($event)) {
|
||||
\uv_poll_stop($event);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case Watcher::DELAY:
|
||||
case Watcher::REPEAT:
|
||||
unset($this->watchers[$eventId]);
|
||||
if (\uv_is_active($event)) {
|
||||
\uv_timer_stop($event);
|
||||
}
|
||||
break;
|
||||
|
||||
case Watcher::SIGNAL:
|
||||
unset($this->watchers[$eventId]);
|
||||
if (\uv_is_active($event)) {
|
||||
\uv_signal_stop($event);
|
||||
}
|
||||
break;
|
||||
|
||||
default: throw new \DomainException("Unknown watcher type");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function cancel($watcherIdentifier) {
|
||||
parent::cancel($watcherIdentifier);
|
||||
|
||||
if (!isset($this->events[$watcherIdentifier])) {
|
||||
return;
|
||||
}
|
||||
|
||||
$event = $this->events[$watcherIdentifier];
|
||||
|
||||
if (empty($this->watchers[(int) $event])) {
|
||||
\uv_close($event);
|
||||
}
|
||||
|
||||
unset($this->events[$watcherIdentifier]);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getHandle() {
|
||||
return $this->handle;
|
||||
}
|
||||
}
|
12
phpunit.xml
12
phpunit.xml
@ -1,12 +0,0 @@
|
||||
<phpunit bootstrap="./vendor/autoload.php" colors="true">
|
||||
<testsuites>
|
||||
<testsuite name="Tests">
|
||||
<directory>./test</directory>
|
||||
</testsuite>
|
||||
</testsuites>
|
||||
<filter>
|
||||
<whitelist addUncoveredFilesFromWhitelist="true">
|
||||
<directory>./src</directory>
|
||||
</whitelist>
|
||||
</filter>
|
||||
</phpunit>
|
@ -1,21 +1,24 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<phpunit
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/4.1/phpunit.xsd"
|
||||
backupGlobals="false"
|
||||
backupStaticAttributes="false"
|
||||
bootstrap="vendor/autoload.php"
|
||||
colors="true"
|
||||
convertErrorsToExceptions="true"
|
||||
convertNoticesToExceptions="true"
|
||||
convertWarningsToExceptions="true"
|
||||
processIsolation="false"
|
||||
stopOnFailure="false"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/4.1/phpunit.xsd"
|
||||
backupGlobals="false"
|
||||
backupStaticAttributes="false"
|
||||
bootstrap="vendor/autoload.php"
|
||||
colors="true"
|
||||
convertErrorsToExceptions="true"
|
||||
convertNoticesToExceptions="true"
|
||||
convertWarningsToExceptions="true"
|
||||
processIsolation="false"
|
||||
stopOnFailure="false"
|
||||
>
|
||||
<testsuites>
|
||||
<testsuite name="Amp">
|
||||
<directory>test</directory>
|
||||
</testsuite>
|
||||
<testsuite name="Amp Loop PHPT">
|
||||
<directory suffix=".phpt">test/phpt</directory>
|
||||
</testsuite>
|
||||
</testsuites>
|
||||
<filter>
|
||||
<whitelist>
|
||||
@ -24,5 +27,6 @@
|
||||
</filter>
|
||||
<logging>
|
||||
<log type="coverage-html" target="build/coverage" title="Amp" highlight="true"/>
|
||||
<log type="coverage-clover" target="build/logs/clover.xml"/>
|
||||
</logging>
|
||||
</phpunit>
|
||||
</phpunit>
|
||||
|
21
test/EvLoopTest.php
Normal file
21
test/EvLoopTest.php
Normal file
@ -0,0 +1,21 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Test\Loop;
|
||||
|
||||
use Amp\Loop\EvLoop;
|
||||
use AsyncInterop\Loop\DriverFactory;
|
||||
use AsyncInterop\Loop\Test;
|
||||
|
||||
/**
|
||||
* @requires extension ev
|
||||
*/
|
||||
class EvLoopTest extends Test {
|
||||
public function getFactory() {
|
||||
$factory = $this->getMockBuilder(DriverFactory::class)->getMock();
|
||||
|
||||
$factory->method('create')
|
||||
->willReturn(new EvLoop);
|
||||
|
||||
return $factory;
|
||||
}
|
||||
}
|
21
test/EventLoopTest.php
Normal file
21
test/EventLoopTest.php
Normal file
@ -0,0 +1,21 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Test\Loop;
|
||||
|
||||
use Amp\Loop\EventLoop;
|
||||
use AsyncInterop\Loop\DriverFactory;
|
||||
use AsyncInterop\Loop\Test;
|
||||
|
||||
/**
|
||||
* @requires extension event
|
||||
*/
|
||||
class EventLoopTest extends Test {
|
||||
public function getFactory() {
|
||||
$factory = $this->getMockBuilder(DriverFactory::class)->getMock();
|
||||
|
||||
$factory->method('create')
|
||||
->willReturn(new EventLoop);
|
||||
|
||||
return $factory;
|
||||
}
|
||||
}
|
18
test/NativeLoopTest.php
Normal file
18
test/NativeLoopTest.php
Normal file
@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Test\Loop;
|
||||
|
||||
use Amp\Loop\NativeLoop;
|
||||
use AsyncInterop\Loop\DriverFactory;
|
||||
use AsyncInterop\Loop\Test;
|
||||
|
||||
class NativeLoopTest extends Test {
|
||||
public function getFactory() {
|
||||
$factory = $this->getMockBuilder(DriverFactory::class)->getMock();
|
||||
|
||||
$factory->method('create')
|
||||
->willReturn(new NativeLoop());
|
||||
|
||||
return $factory;
|
||||
}
|
||||
}
|
21
test/UvLoopTest.php
Normal file
21
test/UvLoopTest.php
Normal file
@ -0,0 +1,21 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Test\Loop;
|
||||
|
||||
use Amp\Loop\UvLoop;
|
||||
use AsyncInterop\Loop\DriverFactory;
|
||||
use AsyncInterop\Loop\Test;
|
||||
|
||||
/**
|
||||
* @requires extension uv
|
||||
*/
|
||||
class UvLoopTest extends Test {
|
||||
public function getFactory() {
|
||||
$factory = $this->getMockBuilder(DriverFactory::class)->getMock();
|
||||
|
||||
$factory->method('create')
|
||||
->willReturn(new UvLoop);
|
||||
|
||||
return $factory;
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user