1
0
mirror of https://github.com/danog/amp.git synced 2024-11-27 12:35:02 +01:00
amp/lib/LibeventReactor.php

298 lines
8.7 KiB
PHP
Raw Normal View History

2013-08-05 22:05:08 +02:00
<?php
namespace Alert;
2014-04-23 05:09:42 +02:00
class LibeventReactor implements SignalReactor {
2013-08-05 22:05:08 +02:00
private $base;
private $watchers = [];
2014-06-11 18:21:46 +02:00
private $lastWatcherId = 0;
private $resolution = 1000;
2014-04-23 05:09:42 +02:00
private $isRunning = FALSE;
private $isGCScheduled = FALSE;
2013-08-05 22:05:08 +02:00
private $gcEvent;
private $stopException;
2014-02-23 22:26:28 +01:00
public function __construct() {
$this->base = event_base_new();
2013-08-05 22:05:08 +02:00
$this->gcEvent = event_new();
event_timer_set($this->gcEvent, [$this, 'collectGarbage']);
event_base_set($this->gcEvent, $this->base);
}
2014-02-23 22:26:28 +01:00
public function run(callable $onStart = NULL) {
if ($this->isRunning) {
return;
}
if ($onStart) {
$this->immediately(function() use ($onStart) { $onStart($this); });
2013-08-05 22:05:08 +02:00
}
$this->doRun();
2013-08-05 22:05:08 +02:00
}
2014-02-23 22:26:28 +01:00
public function tick() {
2013-08-05 22:05:08 +02:00
if (!$this->isRunning) {
$this->doRun(EVLOOP_ONCE | EVLOOP_NONBLOCK);
}
}
private function doRun($flags = 0) {
2014-04-23 05:09:42 +02:00
$this->isRunning = TRUE;
2013-08-05 22:05:08 +02:00
event_base_loop($this->base, $flags);
2014-04-23 05:09:42 +02:00
$this->isRunning = FALSE;
2013-08-05 22:05:08 +02:00
if ($this->stopException) {
$e = $this->stopException;
2014-06-11 16:16:22 +02:00
$this->stopException = NULL;
2013-08-05 22:05:08 +02:00
throw $e;
}
}
2014-02-23 22:26:28 +01:00
public function stop() {
2013-08-05 22:05:08 +02:00
event_base_loopexit($this->base);
}
2014-02-23 22:26:28 +01:00
public function at(callable $callback, $timeString) {
2013-08-05 22:05:08 +02:00
$now = time();
$executeAt = @strtotime($timeString);
2014-04-23 05:09:42 +02:00
if ($executeAt === FALSE || $executeAt <= $now) {
2013-08-05 22:05:08 +02:00
throw new \InvalidArgumentException(
'Valid future time string (parsable by strtotime()) required'
);
}
$msDelay = ($executeAt - $now) * $this->resolution;
return $this->once($callback, $msDelay);
2013-08-05 22:05:08 +02:00
}
2014-02-23 22:26:28 +01:00
public function immediately(callable $callback) {
2014-06-11 18:21:46 +02:00
return $this->once($callback, $msDelay = 0);
2013-08-05 22:05:08 +02:00
}
2014-06-11 18:21:46 +02:00
public function once(callable $callback, $msDelay) {
$watcherId = $this->lastWatcherId++;
$eventResource = event_new();
2014-06-11 18:21:46 +02:00
$msDelay = ($msDelay > 0) ? ($msDelay * $this->resolution) : 0;
2013-08-05 22:05:08 +02:00
$watcher = new LibeventWatcher;
$watcher->id = $watcherId;
$watcher->eventResource = $eventResource;
2014-06-11 18:21:46 +02:00
$watcher->msDelay = $msDelay;
$watcher->callback = $callback;
$watcher->wrapper = $this->wrapOnceCallback($watcher);
$this->watchers[$watcherId] = $watcher;
event_timer_set($eventResource, $watcher->wrapper);
event_base_set($eventResource, $this->base);
2014-06-11 18:21:46 +02:00
event_add($eventResource, $msDelay);
return $watcherId;
}
private function wrapOnceCallback(LibeventWatcher $watcher) {
$callback = $watcher->callback;
$watcherId = $watcher->id;
return function() use ($callback, $watcherId) {
2013-08-05 22:05:08 +02:00
try {
$callback($watcherId, $this);
2013-08-05 22:05:08 +02:00
$this->cancel($watcherId);
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
}
};
}
2013-08-05 22:05:08 +02:00
2014-06-11 18:21:46 +02:00
public function repeat(callable $callback, $msDelay) {
$watcherId = $this->lastWatcherId++;
$msDelay = ($msDelay > 0) ? ($msDelay * $this->resolution) : 0;
$eventResource = event_new();
$watcher = new LibeventWatcher;
$watcher->id = $watcherId;
$watcher->eventResource = $eventResource;
2014-06-11 18:21:46 +02:00
$watcher->msDelay = $msDelay;
$watcher->callback = $callback;
2013-11-27 17:56:29 +01:00
$watcher->wrapper = $this->wrapRepeatingCallback($watcher);
2013-08-05 22:05:08 +02:00
$this->watchers[$watcherId] = $watcher;
event_timer_set($eventResource, $watcher->wrapper);
event_base_set($eventResource, $this->base);
2014-06-11 18:21:46 +02:00
event_add($eventResource, $msDelay);
2013-08-05 22:05:08 +02:00
return $watcherId;
}
private function wrapRepeatingCallback(LibeventWatcher $watcher) {
$callback = $watcher->callback;
$watcherId = $watcher->id;
$eventResource = $watcher->eventResource;
2014-06-11 18:21:46 +02:00
$msDelay = $watcher->msDelay;
2013-08-05 22:05:08 +02:00
2014-06-11 18:21:46 +02:00
return function() use ($callback, $eventResource, $msDelay, $watcherId) {
2013-08-05 22:05:08 +02:00
try {
$callback($watcherId, $this);
2014-06-11 18:21:46 +02:00
event_add($eventResource, $msDelay);
2013-08-05 22:05:08 +02:00
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
}
};
}
2014-04-23 05:09:42 +02:00
public function onReadable($stream, callable $callback, $enableNow = TRUE) {
2013-08-05 22:05:08 +02:00
return $this->watchIoStream($stream, EV_READ | EV_PERSIST, $callback, $enableNow);
}
2014-04-23 05:09:42 +02:00
public function onWritable($stream, callable $callback, $enableNow = TRUE) {
2013-08-05 22:05:08 +02:00
return $this->watchIoStream($stream, EV_WRITE | EV_PERSIST, $callback, $enableNow);
}
private function watchIoStream($stream, $flags, callable $callback, $enableNow) {
2014-06-11 18:21:46 +02:00
$watcherId = $this->lastWatcherId++;
$eventResource = event_new();
$watcher = new LibeventWatcher;
$watcher->id = $watcherId;
$watcher->stream = $stream;
$watcher->callback = $callback;
$watcher->wrapper = $this->wrapStreamCallback($watcher);
$watcher->isEnabled = (bool) $enableNow;
$watcher->eventResource = $eventResource;
2013-08-05 22:05:08 +02:00
$this->watchers[$watcherId] = $watcher;
event_set($eventResource, $stream, $flags, $watcher->wrapper);
event_base_set($eventResource, $this->base);
if ($enableNow) {
event_add($eventResource);
}
return $watcherId;
}
private function wrapStreamCallback(LibeventWatcher $watcher) {
$callback = $watcher->callback;
$watcherId = $watcher->id;
return function($stream) use ($callback, $watcherId) {
2013-08-05 22:05:08 +02:00
try {
$callback($watcherId, $stream, $this);
2013-08-05 22:05:08 +02:00
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
}
};
}
2014-06-11 18:21:46 +02:00
public function watchStream($stream, $flags, callable $callback) {
$flags = (int) $flags;
$enableNow = ($flags & self::WATCH_NOW);
2014-06-11 18:21:46 +02:00
if ($flags & self::WATCH_READ) {
2014-06-11 18:21:46 +02:00
return $this->onWritable($stream, $callback, $enableNow);
} elseif ($flags & self::WATCH_WRITE) {
2014-06-11 18:21:46 +02:00
return $this->onWritable($stream, $callback, $enableNow);
} else {
throw new \DomainException(
'Stream watchers must specify either a WATCH_READ or WATCH_WRITE flag'
2014-06-11 18:21:46 +02:00
);
}
}
2014-04-23 02:45:20 +02:00
public function onSignal($signal, callable $callback) {
2014-04-23 05:09:42 +02:00
$signal = (int) $signal;
2014-06-11 18:21:46 +02:00
$watcherId = $this->lastWatcherId++;
2014-04-23 02:45:20 +02:00
$eventResource = event_new();
$watcher = new LibeventWatcher;
$watcher->id = $watcherId;
$watcher->eventResource = $eventResource;
$watcher->callback = $callback;
$watcher->wrapper = $this->wrapSignalCallback($watcher);
$this->watchers[$watcherId] = $watcher;
event_set($eventResource, $signal, EV_SIGNAL | EV_PERSIST, $watcher->wrapper);
event_base_set($eventResource, $this->base);
event_add($eventResource);
return $watcherId;
}
private function wrapSignalCallback(LibeventWatcher $watcher) {
$callback = $watcher->callback;
$watcherId = $watcher->id;
return function() use ($callback, $watcherId) {
try {
$callback($watcherId, $this);
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
}
};
}
2014-02-23 22:26:28 +01:00
public function cancel($watcherId) {
if (!isset($this->watchers[$watcherId])) {
return;
2013-08-05 22:05:08 +02:00
}
$watcher = $this->watchers[$watcherId];
event_del($watcher->eventResource);
$this->garbage[] = $watcher;
$this->scheduleGarbageCollection();
unset($this->watchers[$watcherId]);
2013-08-05 22:05:08 +02:00
}
2014-02-23 22:26:28 +01:00
public function disable($watcherId) {
2013-08-05 22:05:08 +02:00
if (!isset($this->watchers[$watcherId])) {
return;
}
$watcher = $this->watchers[$watcherId];
if ($watcher->isEnabled) {
event_del($watcher->eventResource);
2014-04-23 05:09:42 +02:00
$watcher->isEnabled = FALSE;
2013-08-05 22:05:08 +02:00
}
}
2014-02-23 22:26:28 +01:00
public function enable($watcherId) {
2013-08-05 22:05:08 +02:00
if (!isset($this->watchers[$watcherId])) {
return;
}
$watcher = $this->watchers[$watcherId];
2013-08-05 22:05:08 +02:00
if (!$watcher->isEnabled) {
2014-06-11 18:21:46 +02:00
event_add($watcher->eventResource, $watcher->msDelay);
2014-04-23 05:09:42 +02:00
$watcher->isEnabled = TRUE;
2013-08-05 22:05:08 +02:00
}
}
private function scheduleGarbageCollection() {
if (!$this->isGCScheduled) {
event_add($this->gcEvent, 0);
2014-04-23 05:09:42 +02:00
$this->isGCScheduled = TRUE;
2013-08-05 22:05:08 +02:00
}
}
private function collectGarbage() {
$this->garbage = [];
2014-04-23 05:09:42 +02:00
$this->isGCScheduled = FALSE;
2013-08-05 22:05:08 +02:00
event_del($this->gcEvent);
}
}