1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Allow multiple IO watchers on a stream without borking libuv

This commit is contained in:
Daniel Lowrey 2014-11-11 16:07:15 -05:00
parent ccc0bdcbe0
commit 2d5a1eb643
4 changed files with 176 additions and 68 deletions

View File

@ -4,10 +4,9 @@ namespace Amp;
class UvIoWatcher extends Struct {
public $id;
public $uvStruct;
public $callback;
public $stream;
public $pollFlag;
public $mode;
public $poll;
public $stream;
public $callback;
public $isEnabled;
}

12
lib/UvPoll.php Normal file
View File

@ -0,0 +1,12 @@
<?php
namespace Amp;
class UvPoll extends Struct {
public $flags;
public $handle;
public $callback;
public $readers = [];
public $writers = [];
public $disable = [];
}

View File

@ -6,6 +6,7 @@ class UvReactor implements SignalReactor {
private $loop;
private $lastWatcherId = 1;
private $watchers;
private $streamIdPollMap = [];
private $gcWatcher;
private $gcCallback;
private $garbage = [];
@ -20,13 +21,10 @@ class UvReactor implements SignalReactor {
private static $MODE_ONCE = 0;
private static $MODE_REPEAT = 1;
private static $MODE_STREAM = 2;
private static $MODE_SIGNAL = 3;
private static $MODE_IMMEDIATE = 4;
private static $WATCH_READ = 0b001;
private static $WATCH_WRITE = 0b010;
private static $WATCH_NOW = 0b100;
private static $MODE_READER = 2;
private static $MODE_WRITER = 3;
private static $MODE_SIGNAL = 4;
private static $MODE_IMMEDIATE = 5;
public function __construct($newLoop = false) {
$this->loop = $newLoop ? uv_loop_new() : uv_default_loop();
@ -179,7 +177,7 @@ class UvReactor implements SignalReactor {
$msInterval = ($msInterval && $msInterval > 0) ? (int) $msInterval : -1;
return ($msInterval === -1)
? $this->watchStream(STDOUT, $callback, self::$WATCH_WRITE | self::$WATCH_NOW)
? $this->watchStream(STDOUT, $callback, self::$MODE_WRITER, true)
: $this->startTimer($callback, $msInterval, $msInterval, self::$MODE_REPEAT);
}
@ -251,9 +249,7 @@ class UvReactor implements SignalReactor {
* @return string Returns a unique watcher ID
*/
public function onReadable($stream, callable $callback, $enableNow = true) {
$flags = $enableNow ? (self::$WATCH_READ | self::$WATCH_NOW) : self::$WATCH_READ;
return $this->watchStream($stream, $callback, $flags);
return $this->watchStream($stream, $callback, self::$MODE_READER, (bool) $enableNow);
}
/**
@ -265,48 +261,74 @@ class UvReactor implements SignalReactor {
* @return string Returns a unique watcher ID
*/
public function onWritable($stream, callable $callback, $enableNow = true) {
$flags = $enableNow ? (self::$WATCH_WRITE | self::$WATCH_NOW) : self::$WATCH_WRITE;
return $this->watchStream($stream, $callback, $flags);
return $this->watchStream($stream, $callback, self::$MODE_WRITER, (bool) $enableNow);
}
private function watchStream($stream, callable $callback, $flags) {
$flags = (int) $flags;
private function watchStream($stream, callable $callback, $mode, $enableNow) {
$streamId = (int) $stream;
$poll = isset($this->streamIdPollMap[$streamId])
? $this->streamIdPollMap[$streamId]
: $this->makePollHandle($stream);
if ($flags & self::$WATCH_READ) {
/** @noinspection PhpUndefinedClassInspection */
$pollFlag = \UV::READABLE;
} elseif ($flags & self::$WATCH_WRITE) {
/** @noinspection PhpUndefinedClassInspection */
$pollFlag = \UV::WRITABLE;
} else {
throw new \DomainException(
'Stream watchers must specify either a WATCH_READ or WATCH_WRITE flag'
);
$watcherId = $this->lastWatcherId;
$this->watchers[$watcherId] = $watcher = new UvIoWatcher;
$watcher->id = $watcherId = $this->lastWatcherId++;
$watcher->mode = $mode;
$watcher->poll = $poll;
$watcher->stream = $stream;
$watcher->callback = $callback;
$watcher->isEnabled = $enableNow;
if ($enableNow === false) {
$poll->disable[$watcherId] = $watcher;
return $watcherId;
}
// Windows requires the socket-specific init function, so make sure we choose that
// specifically when using tcp/ssl streams
$pollStartFunc = $this->isWindows
if ($mode === self::$MODE_READER) {
$poll->readers[$watcherId] = $watcher;
} else {
$poll->writers[$watcherId] = $watcher;
}
$preexistingFlags = $poll->flags;
if ($poll->readers) {
$poll->flags |= \UV::READABLE;
}
if ($poll->writers) {
$poll->flags |= \UV::WRITABLE;
}
if ($preexistingFlags != $poll->flags) {
uv_poll_start($poll->handle, $poll->flags, $poll->callback);
}
return $watcherId;
}
private function makePollHandle($stream) {
// Windows needs the socket-specific init function, so make sure we use
// it when dealing with tcp/ssl streams.
$pollInitFunc = $this->isWindows
? $this->chooseWindowsPollingFunction($stream)
: 'uv_poll_init';
$watcherId = (string) $this->lastWatcherId++;
$streamId = (int) $stream;
$this->streamIdPollMap[$streamId] = $poll = new UvPoll;
$poll->flags = 0;
$poll->handle = $pollInitFunc($this->loop, $stream);
$poll->callback = function($uvHandle, $stat, $events) use ($poll) {
if ($events & \UV::READABLE) {
foreach ($poll->readers as $watcher) {
$this->invokePollWatcher($watcher);
}
}
if ($events & \UV::WRITABLE) {
foreach ($poll->writers as $watcher) {
$this->invokePollWatcher($watcher);
}
}
};
$watcher = new UvIoWatcher;
$watcher->id = $watcherId;
$watcher->mode = self::$MODE_STREAM;
$watcher->stream = $stream;
$watcher->pollFlag = $pollFlag;
$watcher->uvStruct = $pollStartFunc($this->loop, $stream);
$watcher->callback = $this->wrapStreamCallback($watcher, $callback);
if ($watcher->isEnabled = ($flags & self::$WATCH_NOW)) {
uv_poll_start($watcher->uvStruct, $watcher->pollFlag, $watcher->callback);
}
$this->watchers[$watcherId] = $watcher;
return $watcherId;
return $poll;
}
private function chooseWindowsPollingFunction($stream) {
@ -317,18 +339,17 @@ class UvReactor implements SignalReactor {
: 'uv_poll_init';
}
private function wrapStreamCallback($watcher, $callback) {
return function() use ($watcher, $callback) {
try {
$result = $callback($this, $watcher->id, $watcher->stream);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
private function invokePollWatcher(UvIoWatcher $watcher) {
try {
$callback = $watcher->callback;
$result = $callback($this, $watcher->id, $watcher->stream);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
};
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
}
}
/**
@ -386,8 +407,10 @@ class UvReactor implements SignalReactor {
if ($watcher->isEnabled) {
switch ($watcher->mode) {
case self::$MODE_STREAM:
uv_poll_stop($watcher->uvStruct);
case self::$MODE_READER:
// fallthrough
case self::$MODE_WRITER:
$this->clearPollFromWatcher($watcher);
break;
case self::$MODE_SIGNAL:
uv_signal_stop($watcher->uvStruct);
@ -409,6 +432,36 @@ class UvReactor implements SignalReactor {
}
}
private function clearPollFromWatcher(UvIoWatcher $watcher) {
$poll = $watcher->poll;
$watcherId = $watcher->id;
unset(
$poll->readers[$watcherId],
$poll->writers[$watcherId],
$poll->disable[$watcherId]
);
// If any watchers are still enabled for this stream we're finished here
$hasEnabledWatchers = ((int) $poll->readers) + ((int) $poll->writers);
if ($hasEnabledWatchers) {
return;
}
// Always stop polling if no enabled watchers remain
uv_poll_stop($poll->handle);
// If all watchers are disabled we can pull out here
$hasDisabledWatchers = (bool) $poll->disable;
if ($hasDisabledWatchers) {
return;
}
// Otherwise there are no watchers left for this poll and we should clear it
$streamId = (int) $watcher->stream;
unset($this->streamIdPollMap[$streamId]);
}
/**
* Temporarily disable (but don't cancel) an existing timer/stream watcher
*
@ -427,8 +480,10 @@ class UvReactor implements SignalReactor {
}
switch ($watcher->mode) {
case self::$MODE_STREAM:
uv_poll_stop($watcher->uvStruct);
case self::$MODE_READER:
// fallthrough
case self::$MODE_WRITER:
$this->disablePollFromWatcher($watcher);
break;
case self::$MODE_SIGNAL:
uv_signal_stop($watcher->uvStruct);
@ -444,6 +499,25 @@ class UvReactor implements SignalReactor {
$watcher->isEnabled = false;
}
private function disablePollFromWatcher(UvIoWatcher $watcher) {
$poll = $watcher->poll;
$watcherId = $watcher->id;
if ($watcher->mode === self::$MODE_READER) {
unset($poll->readers[$watcherId]);
} else {
unset($poll->writers[$watcherId]);
}
$poll->disable[$watcherId] = $watcher;
// If no enabled watchers remain for this stream we need to disable polling
$shouldStopPolling = !($poll->readers || $poll->writers);
if ($shouldStopPolling) {
uv_poll_stop($poll->handle);
}
}
/**
* Enable a disabled timer/stream watcher
*
@ -462,8 +536,10 @@ class UvReactor implements SignalReactor {
}
switch ($watcher->mode) {
case self::$MODE_STREAM:
uv_poll_start($watcher->uvStruct, $watcher->pollFlag, $watcher->callback);
case self::$MODE_READER:
// fallthrough
case self::$MODE_WRITER:
$this->enablePollFromWatcher($watcher);
break;
case self::$MODE_SIGNAL:
uv_signal_start($watcher->uvStruct, $watcher->callback, $watcher->signo);
@ -479,6 +555,27 @@ class UvReactor implements SignalReactor {
$watcher->isEnabled = true;
}
private function enablePollFromWatcher(UvIoWatcher $watcher) {
$poll = $watcher->poll;
$watcherId = $watcher->id;
unset($poll->disable[$watcherId]);
$preexistingFlags = $poll->flags;
if ($watcher->mode === self::$MODE_READER) {
$poll->flags |= \UV::READABLE;
$poll->readers[$watcherId] = $watcher;
} else {
$poll->flags |= \UV::WRITABLE;
$poll->writers[$watcherId] = $watcher;
}
if ($preexistingFlags != $poll->flags) {
uv_poll_start($poll->handle, $poll->flags, $poll->callback);
}
}
/**
* Access the underlying php-uv extension loop resource
*

View File

@ -244,11 +244,11 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$increment++;
}, $isEnabled = FALSE);
$reactor->once(function() use ($reactor, $watcherId) {
$reactor->immediately(function() use ($reactor, $watcherId) {
$reactor->enable($watcherId);
}, $msDelay = 10);
});
$reactor->once([$reactor, 'stop'], $msDelay = 100);
$reactor->once([$reactor, 'stop'], $msDelay = 250);
$reactor->run();
$this->assertTrue($increment > 0);