1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Add Reactor::__debugInfo() for easier debugging

This commit is contained in:
Daniel Lowrey 2014-11-26 16:27:30 -05:00
parent f686691892
commit 14c209012c
9 changed files with 240 additions and 105 deletions

View File

@ -15,6 +15,8 @@ class LibeventReactor implements SignalReactor {
private $stopException;
private $onGeneratorError;
private static $instanceCount = 0;
public function __construct() {
$this->base = event_base_new();
$this->gcEvent = event_new();
@ -25,6 +27,7 @@ class LibeventReactor implements SignalReactor {
throw $e;
}
};
self::$instanceCount++;
}
/**
@ -152,6 +155,7 @@ class LibeventReactor implements SignalReactor {
$watcher = new \StdClass;
$watcher->id = $watcherId;
$watcher->type = Watcher::IMMEDIATE;
$watcher->callback = $callback;
$watcher->isEnabled = true;
@ -172,8 +176,9 @@ class LibeventReactor implements SignalReactor {
$eventResource = event_new();
$msDelay = ($msDelay > 0) ? ($msDelay * $this->resolution) : 0;
$watcher = new LibeventWatcher;
$watcher = new LibeventTimerWatcher;
$watcher->id = $watcherId;
$watcher->type = Watcher::TIMER_ONCE;
$watcher->eventResource = $eventResource;
$watcher->msDelay = $msDelay;
$watcher->callback = $callback;
@ -218,8 +223,9 @@ class LibeventReactor implements SignalReactor {
$msDelay = ($msDelay > 0) ? ($msDelay * $this->resolution) : 0;
$eventResource = event_new();
$watcher = new LibeventWatcher;
$watcher = new LibeventTimerWatcher;
$watcher->id = $watcherId;
$watcher->type = Watcher::TIMER_REPEAT;
$watcher->eventResource = $eventResource;
$watcher->msDelay = $msDelay;
$watcher->callback = $callback;
@ -263,7 +269,7 @@ class LibeventReactor implements SignalReactor {
* @return string Returns a unique watcher ID
*/
public function onReadable($stream, callable $callback, $enableNow = true) {
return $this->watchIoStream($stream, EV_READ | EV_PERSIST, $callback, $enableNow);
return $this->watchIoStream($stream, Watcher::IO_READER, $callback, $enableNow);
}
/**
@ -275,15 +281,18 @@ class LibeventReactor implements SignalReactor {
* @return string Returns a unique watcher ID
*/
public function onWritable($stream, callable $callback, $enableNow = true) {
return $this->watchIoStream($stream, EV_WRITE | EV_PERSIST, $callback, $enableNow);
return $this->watchIoStream($stream, Watcher::IO_WRITER, $callback, $enableNow);
}
private function watchIoStream($stream, $flags, callable $callback, $enableNow) {
private function watchIoStream($stream, $type, callable $callback, $enableNow) {
$watcherId = (string) $this->lastWatcherId++;
$eventResource = event_new();
$flags = EV_PERSIST;
$flags |= ($type === Watcher::IO_READER) ? EV_READ : EV_WRITE;
$watcher = new LibeventWatcher;
$watcher = new LibeventIoWatcher;
$watcher->id = $watcherId;
$watcher->type = $type;
$watcher->stream = $stream;
$watcher->callback = $callback;
$watcher->wrapper = $this->wrapStreamCallback($watcher);
@ -333,6 +342,7 @@ class LibeventReactor implements SignalReactor {
$eventResource = event_new();
$watcher = new LibeventWatcher;
$watcher->id = $watcherId;
$watcher->type = Watcher::SIGNAL;
$watcher->signo = $signo;
$watcher->eventResource = $eventResource;
$watcher->callback = $onSignal;
@ -437,8 +447,10 @@ class LibeventReactor implements SignalReactor {
if (empty($watcher->eventResource)) {
// It's an immediately watcher
$this->immediates[$watcherId] = $watcher->callback;
} else {
} elseif ($watcher->type & Watcher::TIMER) {
event_add($watcher->eventResource, $watcher->msDelay);
} else {
event_add($watcher->eventResource);
}
$watcher->isEnabled = true;
@ -469,4 +481,72 @@ class LibeventReactor implements SignalReactor {
public function getUnderlyingLoop() {
return $this->base;
}
public function __destruct() {
self::$instanceCount--;
}
public function __debugInfo() {
$immediates = $timers = $readers = $writers = $signals = $disabled = 0;
foreach ($this->watchers as $watcher) {
switch ($watcher->type) {
case Watcher::IMMEDIATE:
$immediates++;
break;
case Watcher::TIMER_ONCE:
case Watcher::TIMER_REPEAT:
$timers++;
break;
case Watcher::IO_READER:
$readers++;
break;
case Watcher::IO_WRITER:
$writers++;
break;
case Watcher::SIGNAL:
$signals++;
break;
default:
throw new \DomainException(
"Unexpected watcher type: {$watcher->type}"
);
}
$disabled += !$watcher->isEnabled;
}
return [
'timers' => $timers,
'immediates' => $immediates,
'io_readers' => $readers,
'io_writers' => $writers,
'signals' => $signals,
'disabled' => $disabled,
'last_watcher_id' => $this->lastWatcherId,
'instances' => self::$instanceCount,
];
}
}
class LibeventWatcher extends Watcher {
// Inherited from Watcher:
// public $id;
// public $type;
// public $isEnabled;
public $eventResource;
public $callback;
public $wrapper;
}
class LibeventSignalWatcher extends LibeventWatcher {
public $signo;
}
class LibeventIoWatcher extends LibeventWatcher {
public $stream;
}
class LibeventTimerWatcher extends LibeventWatcher {
public $msDelay = -1;
}

View File

@ -1,14 +0,0 @@
<?php
namespace Amp;
class LibeventWatcher extends Struct {
public $id;
public $eventResource;
public $stream;
public $signo;
public $callback;
public $wrapper;
public $msDelay = -1;
public $isEnabled = TRUE;
}

View File

@ -18,6 +18,8 @@ class NativeReactor implements Reactor {
private $isRunning = false;
private $onGeneratorError;
private static $instanceCount = 0;
private static $DISABLED_ALARM = 0;
private static $DISABLED_READ = 1;
private static $DISABLED_WRITE = 2;
@ -25,6 +27,7 @@ class NativeReactor implements Reactor {
private static $MICROSECOND = 1000000;
public function __construct() {
self::$instanceCount++;
$this->onGeneratorError = function($e, $r) {
if ($e) {
throw $e;
@ -32,6 +35,22 @@ class NativeReactor implements Reactor {
};
}
public function __debugInfo() {
return [
'timers' => count($this->alarms),
'immediates' => count($this->immediates),
'io_readers' => count($this->readStreams),
'io_writers' => count($this->writeStreams),
'disabled' => count($this->disabledWatchers),
'last_watcher_id' => $this->lastWatcherId,
'instances' => self::$instanceCount,
];
}
public function __destruct() {
self::$instanceCount--;
}
/**
* Start the event reactor and assume program flow control
*

View File

@ -1,12 +0,0 @@
<?php
namespace Amp;
class UvIoWatcher extends Struct {
public $id;
public $mode;
public $poll;
public $stream;
public $callback;
public $isEnabled;
}

View File

@ -1,12 +0,0 @@
<?php
namespace Amp;
class UvPoll extends Struct {
public $flags;
public $handle;
public $callback;
public $readers = [];
public $writers = [];
public $disable = [];
}

View File

@ -18,12 +18,7 @@ class UvReactor implements SignalReactor {
private $immediates = [];
private $onGeneratorError;
private static $MODE_ONCE = 0;
private static $MODE_REPEAT = 1;
private static $MODE_READER = 2;
private static $MODE_WRITER = 3;
private static $MODE_SIGNAL = 4;
private static $MODE_IMMEDIATE = 5;
private static $instanceCount = 0;
public function __construct($newLoop = false) {
$this->loop = $newLoop ? uv_loop_new() : uv_default_loop();
@ -35,6 +30,7 @@ class UvReactor implements SignalReactor {
throw $e;
}
};
self::$instanceCount++;
}
private function collectGarbage() {
@ -142,7 +138,7 @@ class UvReactor implements SignalReactor {
$watcher = new \StdClass;
$watcher->id = $watcherId;
$watcher->mode = self::$MODE_IMMEDIATE;
$watcher->type = Watcher::IMMEDIATE;
$watcher->callback = $callback;
$watcher->isEnabled = true;
@ -159,7 +155,7 @@ class UvReactor implements SignalReactor {
* @return string Returns a unique watcher ID
*/
public function once(callable $callback, $msDelay) {
return $this->startTimer($callback, $msDelay, $msInterval = 0, self::$MODE_ONCE);
return $this->startTimer($callback, $msDelay, $msInterval = 0, Watcher::TIMER_ONCE);
}
/**
@ -176,14 +172,14 @@ class UvReactor implements SignalReactor {
$msInterval = ($msInterval && $msInterval > 0) ? (int) $msInterval : -1;
return ($msInterval === -1)
? $this->watchStream(STDOUT, $callback, self::$MODE_WRITER, true)
: $this->startTimer($callback, $msInterval, $msInterval, self::$MODE_REPEAT);
? $this->watchStream(STDOUT, $callback, Watcher::IO_WRITER, true)
: $this->startTimer($callback, $msInterval, $msInterval, Watcher::TIMER_REPEAT);
}
private function startTimer($callback, $msDelay, $msInterval, $mode) {
private function startTimer($callback, $msDelay, $msInterval, $type) {
$watcher = new UvTimerWatcher;
$watcher->id = (string) $this->lastWatcherId++;
$watcher->mode = $mode;
$watcher->type = $type;
$watcher->uvStruct = uv_timer_init($this->loop);
$watcher->callback = $this->wrapTimerCallback($watcher, $callback);
$watcher->msDelay = ($msDelay > 0) ? (int) $msDelay : 0;
@ -204,7 +200,7 @@ class UvReactor implements SignalReactor {
if ($result instanceof \Generator) {
resolve($result, $this)->when($this->onGeneratorError);
}
if ($watcher->mode === self::$MODE_ONCE) {
if ($watcher->type === Watcher::TIMER_ONCE) {
$this->clearWatcher($watcher->id);
}
} catch (\Exception $e) {
@ -248,7 +244,7 @@ class UvReactor implements SignalReactor {
* @return string Returns a unique watcher ID
*/
public function onReadable($stream, callable $callback, $enableNow = true) {
return $this->watchStream($stream, $callback, self::$MODE_READER, (bool) $enableNow);
return $this->watchStream($stream, $callback, Watcher::IO_READER, (bool) $enableNow);
}
/**
@ -260,10 +256,10 @@ class UvReactor implements SignalReactor {
* @return string Returns a unique watcher ID
*/
public function onWritable($stream, callable $callback, $enableNow = true) {
return $this->watchStream($stream, $callback, self::$MODE_WRITER, (bool) $enableNow);
return $this->watchStream($stream, $callback, Watcher::IO_WRITER, (bool) $enableNow);
}
private function watchStream($stream, callable $callback, $mode, $enableNow) {
private function watchStream($stream, callable $callback, $type, $enableNow) {
$streamId = (int) $stream;
$poll = isset($this->streamIdPollMap[$streamId])
? $this->streamIdPollMap[$streamId]
@ -272,7 +268,7 @@ class UvReactor implements SignalReactor {
$watcherId = $this->lastWatcherId;
$this->watchers[$watcherId] = $watcher = new UvIoWatcher;
$watcher->id = $watcherId = $this->lastWatcherId++;
$watcher->mode = $mode;
$watcher->type = $type;
$watcher->poll = $poll;
$watcher->stream = $stream;
$watcher->callback = $callback;
@ -283,7 +279,7 @@ class UvReactor implements SignalReactor {
return $watcherId;
}
if ($mode === self::$MODE_READER) {
if ($type === Watcher::IO_READER) {
$poll->readers[$watcherId] = $watcher;
} else {
$poll->writers[$watcherId] = $watcher;
@ -362,7 +358,7 @@ class UvReactor implements SignalReactor {
public function onSignal($signo, callable $onSignal) {
$watcher = new UvSignalWatcher;
$watcher->id = (string) $this->lastWatcherId++;
$watcher->mode = self::$MODE_SIGNAL;
$watcher->type = Watcher::SIGNAL;
$watcher->signo = $signo;
$watcher->uvStruct = uv_signal_init($this->loop);
$watcher->callback = $this->wrapSignalCallback($watcher, $onSignal);
@ -406,16 +402,16 @@ class UvReactor implements SignalReactor {
unset($this->watchers[$watcherId]);
if ($watcher->isEnabled) {
switch ($watcher->mode) {
case self::$MODE_READER:
switch ($watcher->type) {
case Watcher::IO_READER:
// fallthrough
case self::$MODE_WRITER:
case Watcher::IO_WRITER:
$this->clearPollFromWatcher($watcher);
break;
case self::$MODE_SIGNAL:
case Watcher::SIGNAL:
uv_signal_stop($watcher->uvStruct);
break;
case self::$MODE_IMMEDIATE:
case Watcher::IMMEDIATE:
unset($this->immediates[$watcherId]);
break;
default:
@ -479,16 +475,16 @@ class UvReactor implements SignalReactor {
return;
}
switch ($watcher->mode) {
case self::$MODE_READER:
switch ($watcher->type) {
case Watcher::IO_READER:
// fallthrough
case self::$MODE_WRITER:
case Watcher::IO_WRITER:
$this->disablePollFromWatcher($watcher);
break;
case self::$MODE_SIGNAL:
case Watcher::SIGNAL:
uv_signal_stop($watcher->uvStruct);
break;
case self::$MODE_IMMEDIATE:
case Watcher::IMMEDIATE:
unset($this->immediates[$watcher->id]);
break;
default:
@ -546,16 +542,16 @@ class UvReactor implements SignalReactor {
return;
}
switch ($watcher->mode) {
case self::$MODE_READER:
switch ($watcher->type) {
case Watcher::IO_READER:
// fallthrough
case self::$MODE_WRITER:
case Watcher::IO_WRITER:
$this->enablePollFromWatcher($watcher);
break;
case self::$MODE_SIGNAL:
case Watcher::SIGNAL:
uv_signal_start($watcher->uvStruct, $watcher->callback, $watcher->signo);
break;
case self::$MODE_IMMEDIATE:
case Watcher::IMMEDIATE:
$this->immediates[$watcher->id] = $watcher->callback;
break;
default:
@ -574,7 +570,7 @@ class UvReactor implements SignalReactor {
$preexistingFlags = $poll->flags;
if ($watcher->mode === self::$MODE_READER) {
if ($watcher->type === Watcher::IO_READER) {
$poll->flags |= \UV::READABLE;
$poll->readers[$watcherId] = $watcher;
} else {
@ -599,4 +595,89 @@ class UvReactor implements SignalReactor {
public function getUnderlyingLoop() {
return $this->loop;
}
public function __destruct() {
self::$instanceCount--;
}
public function __debugInfo() {
$immediates = $timers = $readers = $writers = $signals = $disabled = 0;
foreach ($this->watchers as $watcher) {
switch ($watcher->type) {
case Watcher::IMMEDIATE:
$immediates++;
break;
case Watcher::TIMER_ONCE:
case Watcher::TIMER_REPEAT:
$timers++;
break;
case Watcher::IO_READER:
$readers++;
break;
case Watcher::IO_WRITER:
$writers++;
break;
case Watcher::SIGNAL:
$signals++;
break;
default:
throw new \DomainException(
"Unexpected watcher type: {$watcher->type}"
);
}
$disabled += !$watcher->isEnabled;
}
return [
'timers' => $timers,
'immediates' => $immediates,
'io_readers' => $readers,
'io_writers' => $writers,
'signals' => $signals,
'disabled' => $disabled,
'last_watcher_id' => $this->lastWatcherId,
'instances' => self::$instanceCount,
];
}
}
class UvIoWatcher extends Watcher {
// Inherited:
// public $id;
// public $type;
// public $isEnabled;
public $poll;
public $stream;
public $callback;
}
class UvPoll extends Struct {
public $flags;
public $handle;
public $callback;
public $readers = [];
public $writers = [];
public $disable = [];
}
class UvSignalWatcher extends Watcher {
// Inherited:
// public $id;
// public $type;
// public $isEnabled;
public $signo;
public $uvStruct;
public $callback;
}
class UvTimerWatcher extends Watcher {
// Inherited:
// public $id;
// public $type;
// public $isEnabled;
public $uvStruct;
public $callback;
public $msDelay;
public $msInterval;
}

View File

@ -1,12 +0,0 @@
<?php
namespace Amp;
class UvSignalWatcher extends Struct {
public $id;
public $mode;
public $signo;
public $uvStruct;
public $callback;
public $isEnabled;
}

View File

@ -1,13 +0,0 @@
<?php
namespace Amp;
class UvTimerWatcher extends Struct {
public $id;
public $uvStruct;
public $callback;
public $msDelay;
public $msInterval;
public $mode;
public $isEnabled;
}

18
lib/Watcher.php Normal file
View File

@ -0,0 +1,18 @@
<?php
namespace Amp;
abstract class Watcher extends Struct {
const IMMEDIATE = 0b00000001;
const TIMER = 0b00000010;
const TIMER_ONCE = 0b00000110;
const TIMER_REPEAT = 0b00001010;
const IO = 0b00010000;
const IO_READER = 0b00110000;
const IO_WRITER = 0b01010000;
const SIGNAL = 0b10000000;
public $id;
public $type;
public $isEnabled;
}