1
0
mirror of https://github.com/danog/amp.git synced 2025-01-21 21:01:16 +01:00

Add Registry; support streams with multiple watchers

This commit is contained in:
Aaron Piotrowski 2016-06-02 09:45:04 -05:00
parent 4aa8c5d550
commit acbe2c8237

View File

@ -4,9 +4,12 @@ namespace Amp\Loop;
use Amp\Loop\Internal\Watcher;
use Interop\Async\Loop\Driver;
use Interop\Async\Loop\Registry;
use Interop\Async\Loop\UnsupportedFeatureException;
class NativeLoop implements Driver {
use Registry;
const MILLISEC_PER_SEC = 1e3;
const MICROSEC_PER_SEC = 1e6;
@ -31,7 +34,7 @@ class NativeLoop implements Driver {
private $readStreams = [];
/**
* @var string[]
* @var \Amp\Loop\Internal\Watcher[][]
*/
private $readWatchers = [];
@ -41,7 +44,7 @@ class NativeLoop implements Driver {
private $writeStreams = [];
/**
* @var string[]
* @var \Amp\Loop\Internal\Watcher[][]
*/
private $writeWatchers = [];
@ -56,7 +59,7 @@ class NativeLoop implements Driver {
private $timerQueue;
/**
* @var string[][]
* @var \Amp\Loop\Internal\Watcher[][]
*/
private $signalWatchers = [];
@ -190,22 +193,22 @@ class NativeLoop implements Driver {
if ($count) {
foreach ($read as $stream) {
$key = (int) $stream;
if (isset($this->readStreams[$key], $this->readWatchers[$key])) {
$watcher = $this->watchers[$this->readWatchers[$key]];
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
$streamId = (int) $stream;
if (isset($this->readWatchers[$streamId])) {
foreach ($this->readWatchers[$streamId] as $watcher) {
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
}
}
}
foreach ($write as $stream) {
$key = (int) $stream;
if (isset($this->writeStreams[$key], $this->writeWatchers[$key])) {
$watcher = $this->watchers[$this->writeWatchers[$key]];
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
$streamId = (int) $stream;
if (isset($this->writeWatchers[$streamId])) {
foreach ($this->writeWatchers[$streamId] as $watcher) {
$callback = $watcher->callback;
$callback($watcher->id, $stream, $watcher->data);
}
}
}
}
@ -380,9 +383,9 @@ class NativeLoop implements Driver {
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$key = (int) $watcher->value;
$this->readWatchers[$key] = $watcher->id;
$this->readStreams[$key] = $watcher->value;
$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher;
$this->readStreams[$streamId] = $watcher->value;
return $watcher->id;
}
@ -399,9 +402,9 @@ class NativeLoop implements Driver {
$watcher->data = $data;
$this->watchers[$watcher->id] = $watcher;
$key = (int) $watcher->value;
$this->writeWatchers[$key] = $watcher->id;
$this->writeStreams[$key] = $watcher->value;
$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
$this->writeStreams[$streamId] = $watcher->value;
return $watcher->id;
}
@ -438,24 +441,20 @@ class NativeLoop implements Driver {
private function enableSignal(Watcher $watcher) {
if (!isset($this->signalWatchers[$watcher->value])) {
if (!@\pcntl_signal($watcher->value, function ($signo) {
foreach ($this->signalWatchers[$signo] as $id) {
if (!isset($this->watchers[$id])) {
foreach ($this->signalWatchers[$signo] as $watcher) {
if (!isset($this->watchers[$watcher->id])) {
continue;
}
$watcher = $this->watchers[$id];
$callback = $watcher->callback;
$callback($watcher->id, $signo, $watcher->data);
}
})) {
throw new SignalHandlerException("Failed to register signal handler");
}
$this->signalWatchers[$watcher->value] = [];
}
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher->id;
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
}
/**
@ -491,15 +490,15 @@ class NativeLoop implements Driver {
switch ($watcher->type) {
case Watcher::READABLE:
$key = (int) $watcher->value;
$this->readWatchers[$key] = $watcher->id;
$this->readStreams[$key] = $watcher->value;
$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher->id;
$this->readStreams[$streamId] = $watcher->value;
break;
case Watcher::WRITABLE:
$key = (int) $watcher->value;
$this->writeWatchers[$key] = $watcher->id;
$this->writeStreams[$key] = $watcher->value;
$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher->id;
$this->writeStreams[$streamId] = $watcher->value;
break;
case Watcher::DELAY:
@ -535,13 +534,19 @@ class NativeLoop implements Driver {
switch ($watcher->type) {
case Watcher::READABLE:
$key = (int) $watcher->value;
unset($this->readWatchers[$key], $this->readStreams[$key]);
$streamId = (int) $watcher->value;
unset($this->readWatchers[$streamId][$watcher->id]);
if (empty($this->readWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->readStreams[$streamId]);
}
break;
case Watcher::WRITABLE:
$key = (int) $watcher->value;
unset($this->writeWatchers[$key], $this->writeStreams[$key]);
$streamId = (int) $watcher->value;
unset($this->writeWatchers[$streamId][$watcher->id]);
if (empty($this->writeWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
}
break;
case Watcher::DELAY:
@ -602,7 +607,7 @@ class NativeLoop implements Driver {
foreach ($this->watchers as $watcher) {
switch ($watcher->type) {
case Watcher::READABLE:
if (isset($this->readWatchers[(int) $watcher->value])) {
if (isset($this->readWatchers[(int) $watcher->value][$watcher->id])) {
++$onReadable["enabled"];
} else {
++$onReadable["disabled"];
@ -610,7 +615,7 @@ class NativeLoop implements Driver {
break;
case Watcher::WRITABLE:
if (isset($this->writeWatchers[(int) $watcher->value])) {
if (isset($this->writeWatchers[(int) $watcher->value][$watcher->id])) {
++$onWritable["enabled"];
} else {
++$onWritable["disabled"];
@ -636,13 +641,13 @@ class NativeLoop implements Driver {
}
return [
"watchers" => $watchers,
"defer" => $defer,
"delay" => $delay,
"repeat" => $repeat,
"on_readable" => $onReadable,
"on_writable" => $onWritable,
"on_signal" => $onSignal,
"watchers" => $watchers,
"running" => $this->running,
];
}