1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Update Reactor WATCH constants, remove Reactor::POLL_SOCK

This commit is contained in:
Daniel Lowrey 2014-08-05 23:45:33 -04:00
parent 1807f34041
commit 5878430a0c
5 changed files with 50 additions and 42 deletions

View File

@ -197,15 +197,15 @@ class LibeventReactor implements SignalReactor {
public function watchStream($stream, $flags, callable $callback) {
$flags = (int) $flags;
$enableNow = ($flags & self::ENABLE_NOW);
$enableNow = ($flags & self::WATCH_NOW);
if ($flags & self::POLL_READ) {
if ($flags & self::WATCH_READ) {
return $this->onWritable($stream, $callback, $enableNow);
} elseif ($flags & self::POLL_WRITE) {
} elseif ($flags & self::WATCH_WRITE) {
return $this->onWritable($stream, $callback, $enableNow);
} else {
throw new \DomainException(
'Stream watchers must specify either a POLL_READ or POLL_WRITE flag'
'Stream watchers must specify either a WATCH_READ or WATCH_WRITE flag'
);
}
}

View File

@ -215,15 +215,15 @@ class NativeReactor implements Reactor {
public function watchStream($stream, $flags, callable $callback) {
$flags = (int) $flags;
$enableNow = ($flags & self::ENABLE_NOW);
$enableNow = ($flags & self::WATCH_NOW);
if ($flags & self::POLL_READ) {
if ($flags & self::WATCH_READ) {
return $this->onWritable($stream, $callback, $enableNow);
} elseif ($flags & self::POLL_WRITE) {
} elseif ($flags & self::WATCH_WRITE) {
return $this->onWritable($stream, $callback, $enableNow);
} else {
throw new \DomainException(
'Stream watchers must specify either a POLL_READ or POLL_WRITE flag'
'Stream watchers must specify either a WATCH_READ or WATCH_WRITE flag'
);
}
}

View File

@ -3,10 +3,9 @@
namespace Alert;
interface Reactor {
const POLL_READ = 1;
const POLL_WRITE = 2;
const POLL_SOCK = 4;
const ENABLE_NOW = 8;
const WATCH_READ = 0b001;
const WATCH_WRITE = 0b010;
const WATCH_NOW = 0b100;
/**
* Start the event reactor and assume program flow control
@ -101,7 +100,7 @@ interface Reactor {
*
* @param resource $stream A stream resource to watch for writability
* @param callable $callback Any valid PHP callable
* @param int $flags Option bitmask (Reactor::POLL_READ, Reactor::POLL_WRITE, etc)
* @param int $flags Option bitmask (Reactor::WATCH_READ, Reactor::WATCH_WRITE, etc)
*/
public function watchStream($stream, $flags, callable $callback);

View File

@ -8,32 +8,36 @@ class UvReactor implements Reactor {
private $gcWatcher;
private $gcCallback;
private $garbage = [];
private $isGcScheduled = FALSE;
private $isRunning = FALSE;
private $isGcScheduled = false;
private $isRunning = false;
private $stopException;
private $resolution = 1000;
private $isWindows;
private static $MODE_ONCE = 0;
private static $MODE_REPEAT = 1;
private static $MODE_STREAM = 2;
public function __construct($newLoop = FALSE) {
public function __construct($newLoop = false) {
$this->loop = $newLoop ? uv_loop_new() : uv_default_loop();
$this->gcWatcher = uv_timer_init($this->loop);
$this->gcCallback = function() { $this->collectGarbage(); };
$this->isWindows = (stripos(PHP_OS, 'win') === 0);
}
private function collectGarbage() {
$this->garbage = [];
$this->isGcScheduled = FALSE;
$this->isGcScheduled = false;
}
/**
* Start the event reactor and assume program flow control
*
* @param $onStart Optional callback to invoke immediately upon reactor start
* @param callable $onStart Optional callback to invoke immediately upon reactor start
* @throws \Exception Will throw if code executed during the event loop throws
* @return void
*/
public function run(callable $onStart = NULL) {
public function run(callable $onStart = null) {
if ($this->isRunning) {
return;
}
@ -41,11 +45,11 @@ class UvReactor implements Reactor {
$this->isRunning = TRUE;
$this->immediately(function() use ($onStart) { $onStart($this); });
uv_run($this->loop);
$this->isRunning = FALSE;
$this->isRunning = false;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = NULL;
$this->stopException = null;
throw $e;
}
}
@ -56,11 +60,11 @@ class UvReactor implements Reactor {
public function tick() {
$this->isRunning = TRUE;
uv_run_once($this->loop);
$this->isRunning = FALSE;
$this->isRunning = false;
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = NULL;
$this->stopException = null;
throw $e;
}
}
@ -143,13 +147,14 @@ class UvReactor implements Reactor {
*
* @param callable $callback Any valid PHP callable
* @param string $timeString Any string that can be parsed by strtotime() and is in the future
* @TODO Implement me.
* @throws \InvalidArgumentException if $timeString parse fails
* @return int
*/
public function at(callable $callback, $timeString) {
$now = time();
$executeAt = @strtotime($timeString);
if ($executeAt === FALSE || $executeAt <= $now) {
if ($executeAt === false || $executeAt <= $now) {
throw new \InvalidArgumentException(
'Valid future time string (parsable by strtotime()) required'
);
@ -169,7 +174,7 @@ class UvReactor implements Reactor {
* @return int Returns a unique integer watcher ID
*/
public function onReadable($stream, callable $callback, $enableNow = TRUE) {
$flags = $enableNow ? (self::POLL_READ | self::ENABLE_NOW) : SELF::POLL_READ;
$flags = $enableNow ? (self::WATCH_READ | self::WATCH_NOW) : SELF::WATCH_READ;
return $this->watchStream($stream, $flags, $callback);
}
@ -183,7 +188,7 @@ class UvReactor implements Reactor {
* @return int Returns a unique integer watcher ID
*/
public function onWritable($stream, callable $callback, $enableNow = TRUE) {
$flags = $enableNow ? (self::POLL_WRITE | self::ENABLE_NOW) : SELF::POLL_WRITE;
$flags = $enableNow ? (self::WATCH_WRITE | self::WATCH_NOW) : SELF::WATCH_WRITE;
return $this->watchStream($stream, $flags, $callback);
}
@ -191,32 +196,30 @@ class UvReactor implements Reactor {
/**
* Watch a stream resource for reads or writes (but not both) with additional option flags
*
* NOTE: Windows users MUST specify the Reactor::POLL_SOCK flag when watching a socket
* stream or ext/sockets resource -- this is a limitation of the underlying C code.
*
* @param resource $stream
* @param int $flags A bitmask of watch flags
* @param callable $callback
* @throws \DomainException if no read/write flag specified
* @return int
*/
public function watchStream($stream, $flags, callable $callback) {
$flags = (int) $flags;
if ($flags & self::POLL_READ) {
if ($flags & self::WATCH_READ) {
$pollFlag = \UV::READABLE;
} elseif ($flags & self::POLL_WRITE) {
} elseif ($flags & self::WATCH_WRITE) {
$pollFlag = \UV::WRITABLE;
} else {
throw new \DomainException(
'Stream watchers must specify either a POLL_READ or POLL_WRITE flag'
'Stream watchers must specify either a WATCH_READ or WATCH_WRITE flag'
);
}
// Windows requires the socket-specific init function, so we use the POLL_SOCK flag to
// maximize cross-OS compatibility when polling sockets. This one-off option is a major
// reason for the existence of Reactor::watchStream() because we need an easy way to
// specify flags that may not be applicable across all reactor implementations without
// simultaneously fractaling out the interface API.
$pollStartFunc = ($flags & self::POLL_SOCK) ? 'uv_poll_init_socket' : 'uv_poll_init';
// Windows requires the socket-specific init function, so make sure we choose that
// specifically when using tcp/ssl streams
$pollStartFunc = $this->isWindows
? $this->chooseWindowsPollingFunction($stream)
: 'uv_poll_init';
$watcherId = $this->lastWatcherId++;
@ -227,7 +230,7 @@ class UvReactor implements Reactor {
$watcher->pollFlag = $pollFlag;
$watcher->uvStruct = $pollStartFunc($this->loop, $stream);
$watcher->callback = $this->wrapStreamCallback($watcher, $callback);
if ($watcher->isEnabled = ($flags & self::ENABLE_NOW)) {
if ($watcher->isEnabled = ($flags & self::WATCH_NOW)) {
uv_poll_start($watcher->uvStruct, $watcher->pollFlag, $watcher->callback);
}
@ -236,6 +239,12 @@ class UvReactor implements Reactor {
return $watcherId;
}
private function chooseWindowsPollingFunction($stream) {
return (stream_get_meta_data($stream)['stream_type'] === 'tcp_socket/ssl')
? 'uv_poll_init_socket'
: 'uv_poll_init';
}
private function wrapStreamCallback($watcher, $callback) {
return function() use ($watcher, $callback) {
try {
@ -290,7 +299,7 @@ class UvReactor implements Reactor {
if ($watcher->isEnabled) {
$stopFunc = ($watcher instanceof UvIoWatcher) ? 'uv_poll_stop' : 'uv_timer_stop';
$stopFunc($watcher->uvStruct);
$watcher->isEnabled = FALSE;
$watcher->isEnabled = false;
}
}

View File

@ -157,7 +157,7 @@ function onWritable($stream, callable $func, $enableNow = true) {
* will lead to memory leaks.
*
* @param resource $stream A stream resource to watch for IO capability
* @param int $flags Option bitmask (Reactor::POLL_READ, Reactor::POLL_WRITE, etc)
* @param int $flags Option bitmask (Reactor::WATCH_READ, Reactor::WATCH_WRITE, etc)
* @param callable $func Any valid PHP callable
*/
function watchStream($stream, $flags, callable $func) {