2013-08-05 22:05:08 +02:00
|
|
|
<?php
|
|
|
|
|
|
|
|
namespace Alert;
|
|
|
|
|
2013-08-29 08:38:13 +02:00
|
|
|
class LibeventReactor implements Reactor, Forkable {
|
2013-08-05 22:05:08 +02:00
|
|
|
|
|
|
|
private $base;
|
|
|
|
private $watchers = [];
|
|
|
|
private $lastWatcherId = 0;
|
|
|
|
private $resolution = 1000000;
|
|
|
|
private $isRunning = FALSE;
|
|
|
|
private $isGCScheduled = FALSE;
|
|
|
|
private $gcEvent;
|
|
|
|
private $stopException;
|
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
private static $TYPE_STREAM = 0;
|
|
|
|
private static $TYPE_ONCE = 1;
|
|
|
|
private static $TYPE_REPEATING = 2;
|
|
|
|
|
2013-08-05 22:05:08 +02:00
|
|
|
function __construct() {
|
2013-09-11 19:00:43 +02:00
|
|
|
$this->initialize();
|
2013-08-29 08:38:13 +02:00
|
|
|
}
|
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
/**
|
|
|
|
* Normally this would go into the __construct() function but it's split out into its own
|
|
|
|
* method because we also have to initialize() when calling afterFork().
|
|
|
|
*/
|
|
|
|
private function initialize() {
|
|
|
|
$this->base = event_base_new();
|
2013-08-05 22:05:08 +02:00
|
|
|
$this->gcEvent = event_new();
|
|
|
|
event_timer_set($this->gcEvent, [$this, 'collectGarbage']);
|
|
|
|
event_base_set($this->gcEvent, $this->base);
|
|
|
|
}
|
|
|
|
|
|
|
|
function run() {
|
|
|
|
if (!$this->isRunning) {
|
|
|
|
$this->doRun();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function tick() {
|
|
|
|
if (!$this->isRunning) {
|
|
|
|
$this->doRun(EVLOOP_ONCE | EVLOOP_NONBLOCK);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private function doRun($flags = 0) {
|
|
|
|
$this->isRunning = TRUE;
|
|
|
|
event_base_loop($this->base, $flags);
|
|
|
|
$this->isRunning = FALSE;
|
|
|
|
|
|
|
|
if ($this->stopException) {
|
|
|
|
$e = $this->stopException;
|
|
|
|
throw $e;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function stop() {
|
|
|
|
event_base_loopexit($this->base);
|
|
|
|
}
|
2013-08-29 08:38:13 +02:00
|
|
|
|
2013-08-05 22:05:08 +02:00
|
|
|
function at(callable $callback, $timeString) {
|
|
|
|
$now = time();
|
|
|
|
$executeAt = @strtotime($timeString);
|
2013-08-29 08:38:13 +02:00
|
|
|
|
2013-08-05 22:05:08 +02:00
|
|
|
if ($executeAt === FALSE && $executeAt <= $now) {
|
|
|
|
throw new \InvalidArgumentException(
|
|
|
|
'Valid future time string (parsable by strtotime()) required'
|
|
|
|
);
|
|
|
|
}
|
2013-08-29 08:38:13 +02:00
|
|
|
|
2013-08-05 22:05:08 +02:00
|
|
|
$delay = $executeAt - $now;
|
2013-08-29 08:38:13 +02:00
|
|
|
|
2013-08-05 22:05:08 +02:00
|
|
|
return $this->once($callback, $delay);
|
|
|
|
}
|
|
|
|
|
|
|
|
function immediately(callable $callback) {
|
|
|
|
return $this->once($callback, $delay = 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
function once(callable $callback, $delay) {
|
|
|
|
$watcherId = $this->getNextWatcherId();
|
2013-09-11 19:00:43 +02:00
|
|
|
$eventResource = event_new();
|
2013-08-05 22:05:08 +02:00
|
|
|
$delay = ($delay > 0) ? ($delay * $this->resolution) : 0;
|
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
$watcher = new LibeventWatcher;
|
|
|
|
$watcher->id = $watcherId;
|
|
|
|
$watcher->type = self::$TYPE_ONCE;
|
|
|
|
$watcher->eventResource = $eventResource;
|
|
|
|
$watcher->interval = $delay;
|
|
|
|
$watcher->callback = $callback;
|
|
|
|
|
|
|
|
$watcher->wrapper = $this->wrapOnceCallback($watcher);
|
|
|
|
|
|
|
|
$this->watchers[$watcherId] = $watcher;
|
|
|
|
|
|
|
|
event_timer_set($eventResource, $watcher->wrapper);
|
|
|
|
event_base_set($eventResource, $this->base);
|
|
|
|
event_add($eventResource, $delay);
|
|
|
|
|
|
|
|
return $watcherId;
|
|
|
|
}
|
|
|
|
|
|
|
|
private function wrapOnceCallback(LibeventWatcher $watcher) {
|
|
|
|
$callback = $watcher->callback;
|
|
|
|
$watcherId = $watcher->id;
|
|
|
|
|
|
|
|
return function() use ($callback, $watcherId) {
|
2013-08-05 22:05:08 +02:00
|
|
|
try {
|
2013-09-08 15:46:25 +02:00
|
|
|
$callback($watcherId, $this);
|
2013-08-05 22:05:08 +02:00
|
|
|
$this->cancel($watcherId);
|
|
|
|
} catch (\Exception $e) {
|
|
|
|
$this->stopException = $e;
|
|
|
|
$this->stop();
|
|
|
|
}
|
|
|
|
};
|
2013-09-11 19:00:43 +02:00
|
|
|
}
|
2013-08-05 22:05:08 +02:00
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
function repeat(callable $callback, $interval) {
|
|
|
|
$watcherId = $this->getNextWatcherId();
|
|
|
|
$interval = ($interval > 0) ? ($interval * $this->resolution) : 0;
|
|
|
|
$eventResource = event_new();
|
|
|
|
|
|
|
|
$watcher = new LibeventWatcher;
|
|
|
|
$watcher->id = $watcherId;
|
|
|
|
$watcher->type = self::$TYPE_REPEATING;
|
|
|
|
$watcher->eventResource = $eventResource;
|
|
|
|
$watcher->interval = $interval;
|
|
|
|
$watcher->callback = $callback;
|
|
|
|
$watcher->isRepeating = TRUE;
|
|
|
|
|
|
|
|
$watcher->wrapper = $this->wrapRepeatingCallback($watcher);
|
2013-08-05 22:05:08 +02:00
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
$this->watchers[$watcherId] = $watcher;
|
|
|
|
|
|
|
|
event_timer_set($eventResource, $watcher->wrapper);
|
|
|
|
event_base_set($eventResource, $this->base);
|
|
|
|
event_add($eventResource, $interval);
|
2013-08-05 22:05:08 +02:00
|
|
|
|
|
|
|
return $watcherId;
|
|
|
|
}
|
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
private function wrapRepeatingCallback(LibeventWatcher $watcher) {
|
|
|
|
$callback = $watcher->callback;
|
|
|
|
$watcherId = $watcher->id;
|
|
|
|
$eventResource = $watcher->eventResource;
|
|
|
|
$interval = $watcher->interval;
|
2013-08-05 22:05:08 +02:00
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
return function() use ($callback, $eventResource, $interval, $watcherId) {
|
2013-08-05 22:05:08 +02:00
|
|
|
try {
|
2013-09-08 15:46:25 +02:00
|
|
|
$callback($watcherId, $this);
|
2013-09-11 19:00:43 +02:00
|
|
|
event_add($eventResource, $interval);
|
2013-08-05 22:05:08 +02:00
|
|
|
} catch (\Exception $e) {
|
|
|
|
$this->stopException = $e;
|
|
|
|
$this->stop();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
function onReadable($stream, callable $callback, $enableNow = TRUE) {
|
|
|
|
return $this->watchIoStream($stream, EV_READ | EV_PERSIST, $callback, $enableNow);
|
|
|
|
}
|
|
|
|
|
|
|
|
function onWritable($stream, callable $callback, $enableNow = TRUE) {
|
|
|
|
return $this->watchIoStream($stream, EV_WRITE | EV_PERSIST, $callback, $enableNow);
|
|
|
|
}
|
|
|
|
|
|
|
|
private function watchIoStream($stream, $flags, callable $callback, $enableNow) {
|
|
|
|
$watcherId = $this->getNextWatcherId();
|
2013-09-11 19:00:43 +02:00
|
|
|
$eventResource = event_new();
|
|
|
|
|
|
|
|
$watcher = new LibeventWatcher;
|
|
|
|
$watcher->id = $watcherId;
|
|
|
|
$watcher->type = self::$TYPE_STREAM;
|
|
|
|
$watcher->stream = $stream;
|
|
|
|
$watcher->streamFlags = $flags;
|
|
|
|
$watcher->callback = $callback;
|
|
|
|
$watcher->wrapper = $this->wrapStreamCallback($watcher);
|
|
|
|
$watcher->isEnabled = (bool) $enableNow;
|
|
|
|
$watcher->eventResource = $eventResource;
|
2013-08-05 22:05:08 +02:00
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
$this->watchers[$watcherId] = $watcher;
|
|
|
|
|
|
|
|
event_set($eventResource, $stream, $flags, $watcher->wrapper);
|
|
|
|
event_base_set($eventResource, $this->base);
|
|
|
|
|
|
|
|
if ($enableNow) {
|
|
|
|
event_add($eventResource);
|
|
|
|
}
|
|
|
|
|
|
|
|
return $watcherId;
|
|
|
|
}
|
|
|
|
|
|
|
|
private function wrapStreamCallback(LibeventWatcher $watcher) {
|
|
|
|
$callback = $watcher->callback;
|
|
|
|
$watcherId = $watcher->id;
|
|
|
|
|
|
|
|
return function($stream) use ($callback, $watcherId) {
|
2013-08-05 22:05:08 +02:00
|
|
|
try {
|
2013-09-08 15:46:25 +02:00
|
|
|
$callback($watcherId, $stream, $this);
|
2013-08-05 22:05:08 +02:00
|
|
|
} catch (\Exception $e) {
|
|
|
|
$this->stopException = $e;
|
|
|
|
$this->stop();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
function cancel($watcherId) {
|
2013-09-11 19:00:43 +02:00
|
|
|
if (!isset($this->watchers[$watcherId])) {
|
|
|
|
return;
|
2013-08-05 22:05:08 +02:00
|
|
|
}
|
2013-09-11 19:00:43 +02:00
|
|
|
|
|
|
|
$watcher = $this->watchers[$watcherId];
|
|
|
|
event_del($watcher->eventResource);
|
|
|
|
$this->garbage[] = $watcher;
|
|
|
|
$this->scheduleGarbageCollection();
|
|
|
|
unset($this->watchers[$watcherId]);
|
2013-08-05 22:05:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
function disable($watcherId) {
|
|
|
|
if (!isset($this->watchers[$watcherId])) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
$watcher = $this->watchers[$watcherId];
|
|
|
|
if ($watcher->isEnabled) {
|
|
|
|
event_del($watcher->eventResource);
|
|
|
|
$watcher->isEnabled = FALSE;
|
2013-08-05 22:05:08 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
function enable($watcherId) {
|
|
|
|
if (!isset($this->watchers[$watcherId])) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
$watcher = $this->watchers[$watcherId];
|
2013-08-05 22:05:08 +02:00
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
if (!$watcher->isEnabled) {
|
|
|
|
event_add($watcher->eventResource, $watcher->interval);
|
|
|
|
$watcher->isEnabled = TRUE;
|
2013-08-05 22:05:08 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private function getNextWatcherId() {
|
|
|
|
if (($watcherId = ++$this->lastWatcherId) === PHP_INT_MAX) {
|
|
|
|
$this->lastWatcherId = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
return $watcherId;
|
|
|
|
}
|
|
|
|
|
|
|
|
private function scheduleGarbageCollection() {
|
|
|
|
if (!$this->isGCScheduled) {
|
|
|
|
event_add($this->gcEvent, 0);
|
|
|
|
$this->isGCScheduled = TRUE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private function collectGarbage() {
|
|
|
|
$this->garbage = [];
|
|
|
|
$this->isGCScheduled = FALSE;
|
|
|
|
event_del($this->gcEvent);
|
|
|
|
}
|
2013-08-29 08:38:13 +02:00
|
|
|
|
|
|
|
function beforeFork() {
|
|
|
|
$this->collectGarbage();
|
|
|
|
}
|
|
|
|
|
|
|
|
function afterFork() {
|
2013-09-11 19:00:43 +02:00
|
|
|
$this->initialize();
|
|
|
|
|
|
|
|
foreach ($this->watchers as $watcherId => $watcher) {
|
|
|
|
$eventResource = event_new();
|
|
|
|
$watcher->eventResource = $eventResource;
|
|
|
|
|
|
|
|
switch ($watcher->type) {
|
|
|
|
case self::$TYPE_STREAM:
|
|
|
|
$wrapper = $this->wrapStreamCallback($watcher);
|
|
|
|
event_set($eventResource, $watcher->stream, $watcher->streamFlags, $wrapper);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case self::$TYPE_ONCE:
|
|
|
|
$wrapper = $this->wrapOnceCallback($watcher);
|
|
|
|
event_timer_set($eventResource, $wrapper);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case self::$TYPE_REPEATING:
|
|
|
|
$wrapper = $this->wrapRepeatingCallback($watcher);
|
|
|
|
event_timer_set($eventResource, $wrapper);
|
|
|
|
break;
|
|
|
|
}
|
2013-08-29 08:38:13 +02:00
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
$watcher->wrapper = $wrapper;
|
|
|
|
event_base_set($eventResource, $this->base);
|
2013-08-29 08:38:13 +02:00
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
if ($watcher->isEnabled) {
|
|
|
|
event_add($eventResource, $watcher->interval);
|
2013-08-29 08:38:13 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-09-11 19:00:43 +02:00
|
|
|
}
|