mirror of
https://github.com/danog/amp.git
synced 2024-11-30 04:29:08 +01:00
Add UvReactor signal handling support
This commit is contained in:
parent
383a90cb68
commit
15b7b4ad80
14
lib/SignalReactor.php
Executable file → Normal file
14
lib/SignalReactor.php
Executable file → Normal file
@ -4,12 +4,12 @@ namespace Alert;
|
||||
|
||||
interface SignalReactor extends Reactor {
|
||||
|
||||
/**
|
||||
* React to process control signals
|
||||
*
|
||||
* @param int $signal The signal to watch for (e.g. 2 for SIGINT)
|
||||
* @param callable $onSignal
|
||||
* @return int Returns a unique integer watcher ID
|
||||
/**
|
||||
* React to process control signals
|
||||
*
|
||||
* @param int $signo The signal number to watch for
|
||||
* @param callable $onSignal
|
||||
* @return int Returns a unique integer watcher ID
|
||||
*/
|
||||
public function onSignal($signal, callable $onSignal);
|
||||
public function onSignal($signo, callable $onSignal);
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Alert;
|
||||
|
||||
class UvReactor implements Reactor {
|
||||
class UvReactor implements SignalReactor {
|
||||
private $loop;
|
||||
private $lastWatcherId = 0;
|
||||
private $gcWatcher;
|
||||
@ -17,6 +17,7 @@ class UvReactor implements Reactor {
|
||||
private static $MODE_ONCE = 0;
|
||||
private static $MODE_REPEAT = 1;
|
||||
private static $MODE_STREAM = 2;
|
||||
private static $MODE_SIGNAL = 3;
|
||||
|
||||
public function __construct($newLoop = false) {
|
||||
$this->loop = $newLoop ? uv_loop_new() : uv_default_loop();
|
||||
@ -42,7 +43,7 @@ class UvReactor implements Reactor {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->isRunning = TRUE;
|
||||
$this->isRunning = true;
|
||||
$this->immediately(function() use ($onStart) { $onStart($this); });
|
||||
uv_run($this->loop);
|
||||
$this->isRunning = false;
|
||||
@ -58,7 +59,7 @@ class UvReactor implements Reactor {
|
||||
* Execute a single event loop iteration
|
||||
*/
|
||||
public function tick() {
|
||||
$this->isRunning = TRUE;
|
||||
$this->isRunning = true;
|
||||
uv_run_once($this->loop);
|
||||
$this->isRunning = false;
|
||||
|
||||
@ -119,7 +120,7 @@ class UvReactor implements Reactor {
|
||||
$watcher->callback = $this->wrapTimerCallback($watcher, $callback);
|
||||
$watcher->msDelay = ($msDelay > 0) ? (int) $msDelay : 0;
|
||||
$watcher->msInterval = ($msInterval > 0) ? (int) $msInterval : 0;
|
||||
$watcher->isEnabled = TRUE;
|
||||
$watcher->isEnabled = true;
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
|
||||
@ -173,7 +174,7 @@ class UvReactor implements Reactor {
|
||||
* @param bool $enableNow Should the watcher be enabled now or held for later use?
|
||||
* @return int Returns a unique integer watcher ID
|
||||
*/
|
||||
public function onReadable($stream, callable $callback, $enableNow = TRUE) {
|
||||
public function onReadable($stream, callable $callback, $enableNow = true) {
|
||||
$flags = $enableNow ? (self::WATCH_READ | self::WATCH_NOW) : SELF::WATCH_READ;
|
||||
|
||||
return $this->watchStream($stream, $flags, $callback);
|
||||
@ -187,7 +188,7 @@ class UvReactor implements Reactor {
|
||||
* @param bool $enableNow Should the watcher be enabled now or held for later use?
|
||||
* @return int Returns a unique integer watcher ID
|
||||
*/
|
||||
public function onWritable($stream, callable $callback, $enableNow = TRUE) {
|
||||
public function onWritable($stream, callable $callback, $enableNow = true) {
|
||||
$flags = $enableNow ? (self::WATCH_WRITE | self::WATCH_NOW) : SELF::WATCH_WRITE;
|
||||
|
||||
return $this->watchStream($stream, $flags, $callback);
|
||||
@ -200,7 +201,7 @@ class UvReactor implements Reactor {
|
||||
* @param int $flags A bitmask of watch flags
|
||||
* @param callable $callback
|
||||
* @throws \DomainException if no read/write flag specified
|
||||
* @return int
|
||||
* @return int Returns a unique integer watcher ID
|
||||
*/
|
||||
public function watchStream($stream, $flags, callable $callback) {
|
||||
$flags = (int) $flags;
|
||||
@ -256,10 +257,45 @@ class UvReactor implements Reactor {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* React to process control signals
|
||||
*
|
||||
* @param int $signo The signal number to watch for (e.g. 2 for Uv::SIGINT)
|
||||
* @param callable $onSignal
|
||||
* @return int Returns a unique integer watcher ID
|
||||
*/
|
||||
public function onSignal($signo, callable $onSignal) {
|
||||
$watcher = new UvSignalWatcher;
|
||||
$watcher->id = $this->lastWatcherId++;
|
||||
$watcher->mode = self::$MODE_SIGNAL;
|
||||
$watcher->signo = $signo;
|
||||
$watcher->uvStruct = uv_signal_init($this->loop);
|
||||
$watcher->callback = $this->wrapSignalCallback($watcher, $onSignal);
|
||||
$watcher->isEnabled = true;
|
||||
|
||||
uv_signal_start($watcher->uvStruct, $watcher->uvStruct, $watcher->signo);
|
||||
|
||||
$this->watchers[$watcher->id] = $watcher;
|
||||
|
||||
return $watcher->id;
|
||||
}
|
||||
|
||||
private function wrapSignalCallback($watcher, $callback) {
|
||||
return function() use ($watcher, $callback) {
|
||||
try {
|
||||
$callback($watcher->id, $watcher->signo, $this);
|
||||
} catch (\Exception $e) {
|
||||
$this->stopException = $e;
|
||||
$this->stop();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel an existing watcher
|
||||
*
|
||||
* @param int $watcherId
|
||||
* @return void
|
||||
*/
|
||||
public function cancel($watcherId) {
|
||||
if (isset($this->watchers[$watcherId])) {
|
||||
@ -272,15 +308,24 @@ class UvReactor implements Reactor {
|
||||
unset($this->watchers[$watcherId]);
|
||||
|
||||
if ($watcher->isEnabled) {
|
||||
$stopFunc = ($watcher instanceof UvIoWatcher) ? 'uv_poll_stop' : 'uv_timer_stop';
|
||||
$stopFunc($watcher->uvStruct);
|
||||
switch ($watcher->mode) {
|
||||
case self::$MODE_STREAM:
|
||||
uv_poll_stop($watcher->uvStruct);
|
||||
break;
|
||||
case self::$MODE_SIGNAL:
|
||||
uv_signal_stop($watcher->uvStruct);
|
||||
break;
|
||||
default:
|
||||
uv_timer_stop($watcher->uvStruct);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
$this->garbage[] = $watcher;
|
||||
|
||||
if (!$this->isGcScheduled) {
|
||||
uv_timer_start($this->gcWatcher, 250, 0, $this->gcCallback);
|
||||
$this->isGcScheduled = TRUE;
|
||||
$this->isGcScheduled = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -288,6 +333,7 @@ class UvReactor implements Reactor {
|
||||
* Temporarily disable (but don't cancel) an existing timer/stream watcher
|
||||
*
|
||||
* @param int $watcherId
|
||||
* @return void
|
||||
*/
|
||||
public function disable($watcherId) {
|
||||
if (!isset($this->watchers[$watcherId])) {
|
||||
@ -296,17 +342,30 @@ class UvReactor implements Reactor {
|
||||
|
||||
$watcher = $this->watchers[$watcherId];
|
||||
|
||||
if ($watcher->isEnabled) {
|
||||
$stopFunc = ($watcher instanceof UvIoWatcher) ? 'uv_poll_stop' : 'uv_timer_stop';
|
||||
$stopFunc($watcher->uvStruct);
|
||||
$watcher->isEnabled = false;
|
||||
if (!$watcher->isEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch ($watcher->mode) {
|
||||
case self::$MODE_STREAM:
|
||||
uv_poll_stop($watcher->uvStruct);
|
||||
break;
|
||||
case self::$MODE_SIGNAL:
|
||||
uv_signal_stop($watcher->uvStruct);
|
||||
break;
|
||||
default:
|
||||
uv_timer_stop($watcher->uvStruct);
|
||||
break;
|
||||
}
|
||||
|
||||
$watcher->isEnabled = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enable a disabled timer/stream watcher
|
||||
*
|
||||
* @param int $watcherId
|
||||
* @return void
|
||||
*/
|
||||
public function enable($watcherId) {
|
||||
if (!isset($this->watchers[$watcherId])) {
|
||||
@ -319,12 +378,18 @@ class UvReactor implements Reactor {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($watcher->mode === self::$MODE_STREAM) {
|
||||
uv_poll_start($watcher->uvStruct, $watcher->pollFlag, $watcher->callback);
|
||||
} else {
|
||||
uv_timer_start($watcher->uvStruct, $watcher->msDelay, $watcher->msInterval, $watcher->callback);
|
||||
switch ($watcher->mode) {
|
||||
case self::$MODE_STREAM:
|
||||
uv_poll_start($watcher->uvStruct, $watcher->pollFlag, $watcher->callback);
|
||||
break;
|
||||
case self::$MODE_SIGNAL:
|
||||
uv_signal_start($watcher->uvStruct, $watcher->callback, $watcher->signo);
|
||||
break;
|
||||
default:
|
||||
uv_timer_start($watcher->uvStruct, $watcher->msDelay, $watcher->msInterval, $watcher->callback);
|
||||
break;
|
||||
}
|
||||
|
||||
$watcher->isEnabled = TRUE;
|
||||
$watcher->isEnabled = true;
|
||||
}
|
||||
}
|
||||
|
12
lib/UvSignalWatcher.php
Normal file
12
lib/UvSignalWatcher.php
Normal file
@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
namespace Alert;
|
||||
|
||||
class UvSignalWatcher {
|
||||
public $id;
|
||||
public $mode;
|
||||
public $signo;
|
||||
public $uvStruct;
|
||||
public $callback;
|
||||
public $isEnabled;
|
||||
}
|
@ -212,3 +212,24 @@ function reactor(callable $factory = null) {
|
||||
static $reactor;
|
||||
return ($reactor = $reactor ?: ReactorFactory::select($factory));
|
||||
}
|
||||
|
||||
/**
|
||||
* React to process control signals
|
||||
*
|
||||
* @param int $signo The signal number to watch for
|
||||
* @param callable $onSignal
|
||||
* @throws \RuntimeException if the current environment cannot support signal handling
|
||||
* @return int Returns a unique integer watcher ID
|
||||
*/
|
||||
function onSignal($signo, callable $onSignal) {
|
||||
static $reactor;
|
||||
if ($reactor) {
|
||||
return $reactor->onSignal($signo, $onSignal);
|
||||
} elseif (!($reactor = ReactorFactory::select() instanceof SignalReactor)) {
|
||||
throw new \RuntimeException(
|
||||
'Your PHP environment does not support signal handling. Please install pecl/libevent or the php-uv extension'
|
||||
);
|
||||
} else {
|
||||
return $reactor->onSignal($signo, $onSignal);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user