1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 17:37:50 +01:00
amp/lib/Loop/EventDriver.php

323 lines
8.6 KiB
PHP
Raw Normal View History

<?php
namespace Amp\Loop;
use Amp\Coroutine;
use Amp\Promise;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\Internal\getCurrentTime;
use function Amp\Promise\rethrow;
2018-06-18 20:00:01 +02:00
class EventDriver extends Driver
{
/** @var \Event[]|null */
private static $activeSignals;
2018-01-06 03:32:57 +01:00
2017-02-17 05:36:32 +01:00
/** @var \EventBase */
private $handle;
2018-01-06 03:32:57 +01:00
2017-02-17 05:36:32 +01:00
/** @var \Event[] */
private $events = [];
2018-01-06 03:32:57 +01:00
2017-02-17 05:36:32 +01:00
/** @var callable */
private $ioCallback;
2018-01-06 03:32:57 +01:00
2017-02-17 05:36:32 +01:00
/** @var callable */
private $timerCallback;
2018-01-06 03:32:57 +01:00
2017-02-17 05:36:32 +01:00
/** @var callable */
private $signalCallback;
2018-01-06 03:32:57 +01:00
2017-02-17 05:36:32 +01:00
/** @var \Event[] */
private $signals = [];
2018-01-06 03:32:57 +01:00
/** @var bool */
private $nowUpdateNeeded = false;
/** @var int Internal timestamp for now. */
2018-11-26 17:33:03 +01:00
private $now;
2018-01-06 03:32:57 +01:00
/** @var int Loop time offset */
2018-01-06 03:32:57 +01:00
private $nowOffset;
2017-03-10 19:19:32 +01:00
2018-06-18 20:00:01 +02:00
public function __construct()
{
$this->handle = new \EventBase;
$this->nowOffset = getCurrentTime();
2018-11-26 17:33:03 +01:00
$this->now = \random_int(0, $this->nowOffset);
$this->nowOffset -= $this->now;
2017-03-10 19:19:32 +01:00
if (self::$activeSignals === null) {
self::$activeSignals = &$this->signals;
}
2017-03-10 19:19:32 +01:00
$this->ioCallback = function ($resource, $what, Watcher $watcher) {
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->timerCallback = function ($resource, $what, Watcher $watcher) {
if ($watcher->type & Watcher::DELAY) {
$this->cancel($watcher->id);
} else {
$this->events[$watcher->id]->add($watcher->value / self::MILLISEC_PER_SEC);
}
try {
$result = ($watcher->callback)($watcher->id, $watcher->data);
2017-03-12 18:03:13 +01:00
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
$this->signalCallback = function ($signum, $what, Watcher $watcher) {
try {
$result = ($watcher->callback)($watcher->id, $watcher->value, $watcher->data);
if ($result === null) {
return;
}
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise || $result instanceof ReactPromise) {
rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
};
}
2017-03-10 19:19:32 +01:00
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
public function cancel(string $watcherId)
{
parent::cancel($watcherId);
if (isset($this->events[$watcherId])) {
$this->events[$watcherId]->free();
unset($this->events[$watcherId]);
}
}
2018-06-18 20:00:01 +02:00
public static function isSupported(): bool
{
return \extension_loaded("event");
}
2017-05-02 19:30:34 +02:00
/**
* @codeCoverageIgnore
*/
2018-06-18 20:00:01 +02:00
public function __destruct()
{
foreach ($this->events as $event) {
if ($event !== null) { // Events may have been nulled in extension depending on destruct order.
$event->free();
}
}
// Unset here, otherwise $event->del() fails with a warning, because __destruct order isn't defined.
// See https://github.com/amphp/amp/issues/159.
$this->events = [];
// Manually free the loop handle to fully release loop resources.
// See https://github.com/amphp/amp/issues/177.
if ($this->handle !== null) {
$this->handle->free();
$this->handle = null;
}
}
2017-03-10 19:19:32 +01:00
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
public function run()
{
$active = self::$activeSignals;
2017-03-10 19:19:32 +01:00
foreach ($active as $event) {
$event->del();
}
2017-03-10 19:19:32 +01:00
self::$activeSignals = &$this->signals;
2017-03-10 19:19:32 +01:00
foreach ($this->signals as $event) {
$event->add();
}
2017-03-10 19:19:32 +01:00
try {
parent::run();
} finally {
foreach ($this->signals as $event) {
$event->del();
}
2017-03-10 19:19:32 +01:00
self::$activeSignals = &$active;
2017-03-10 19:19:32 +01:00
foreach ($active as $event) {
$event->add();
}
}
}
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
public function stop()
{
$this->handle->stop();
parent::stop();
}
2018-01-06 03:32:57 +01:00
/**
* {@inheritdoc}
*/
public function now(): int
{
if ($this->nowUpdateNeeded) {
$this->now = getCurrentTime() - $this->nowOffset;
2018-01-06 03:32:57 +01:00
$this->nowUpdateNeeded = false;
}
return $this->now;
}
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
public function getHandle(): \EventBase
{
return $this->handle;
}
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
protected function dispatch(bool $blocking)
{
2018-01-06 03:32:57 +01:00
$this->nowUpdateNeeded = true;
$this->handle->loop($blocking ? \EventBase::LOOP_ONCE : \EventBase::LOOP_ONCE | \EventBase::LOOP_NONBLOCK);
}
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
protected function activate(array $watchers)
{
$now = getCurrentTime() - $this->nowOffset;
foreach ($watchers as $watcher) {
if (!isset($this->events[$id = $watcher->id])) {
switch ($watcher->type) {
case Watcher::READABLE:
$this->events[$id] = new \Event(
$this->handle,
$watcher->value,
\Event::READ | \Event::PERSIST,
$this->ioCallback,
$watcher
);
break;
case Watcher::WRITABLE:
$this->events[$id] = new \Event(
$this->handle,
$watcher->value,
\Event::WRITE | \Event::PERSIST,
$this->ioCallback,
$watcher
);
break;
case Watcher::DELAY:
case Watcher::REPEAT:
$this->events[$id] = new \Event(
$this->handle,
-1,
\Event::TIMEOUT,
$this->timerCallback,
$watcher
);
break;
case Watcher::SIGNAL:
$this->events[$id] = new \Event(
$this->handle,
$watcher->value,
\Event::SIGNAL | \Event::PERSIST,
$this->signalCallback,
$watcher
);
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
2018-06-18 20:00:01 +02:00
// @codeCoverageIgnoreEnd
}
}
2017-03-10 19:19:32 +01:00
switch ($watcher->type) {
case Watcher::DELAY:
case Watcher::REPEAT:
2018-01-06 03:32:57 +01:00
$interval = $watcher->value - ($now - $this->now());
$this->events[$id]->add($interval > 0 ? $interval / self::MILLISEC_PER_SEC : 0);
break;
2017-03-10 19:19:32 +01:00
case Watcher::SIGNAL:
$this->signals[$id] = $this->events[$id];
2018-06-18 20:00:01 +02:00
// no break
2017-03-10 19:19:32 +01:00
default:
$this->events[$id]->add();
break;
}
}
}
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
protected function deactivate(Watcher $watcher)
{
if (isset($this->events[$id = $watcher->id])) {
$this->events[$id]->del();
2017-03-10 19:19:32 +01:00
if ($watcher->type === Watcher::SIGNAL) {
unset($this->signals[$id]);
}
}
}
}