2016-05-19 18:12:35 +02:00
|
|
|
<?php
|
|
|
|
|
|
|
|
namespace Amp\Loop;
|
|
|
|
|
2016-05-26 06:09:53 +02:00
|
|
|
use Amp\Loop\Internal\Watcher;
|
2016-05-27 17:16:36 +02:00
|
|
|
use Interop\Async\Loop\UnsupportedFeatureException;
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
class NativeLoop extends Loop {
|
2016-05-19 18:12:35 +02:00
|
|
|
/**
|
|
|
|
* @var resource[]
|
|
|
|
*/
|
|
|
|
private $readStreams = [];
|
|
|
|
|
|
|
|
/**
|
2016-06-02 16:45:04 +02:00
|
|
|
* @var \Amp\Loop\Internal\Watcher[][]
|
2016-05-19 18:12:35 +02:00
|
|
|
*/
|
|
|
|
private $readWatchers = [];
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @var resource[]
|
|
|
|
*/
|
|
|
|
private $writeStreams = [];
|
|
|
|
|
|
|
|
/**
|
2016-06-02 16:45:04 +02:00
|
|
|
* @var \Amp\Loop\Internal\Watcher[][]
|
2016-05-19 18:12:35 +02:00
|
|
|
*/
|
|
|
|
private $writeWatchers = [];
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @var int[]
|
|
|
|
*/
|
|
|
|
private $timerExpires = [];
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @var \SplPriorityQueue
|
|
|
|
*/
|
|
|
|
private $timerQueue;
|
|
|
|
|
|
|
|
/**
|
2016-06-02 16:45:04 +02:00
|
|
|
* @var \Amp\Loop\Internal\Watcher[][]
|
2016-05-19 18:12:35 +02:00
|
|
|
*/
|
|
|
|
private $signalWatchers = [];
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @var bool
|
|
|
|
*/
|
|
|
|
private $signalHandling;
|
|
|
|
|
|
|
|
public function __construct() {
|
|
|
|
$this->timerQueue = new \SplPriorityQueue();
|
2016-05-26 06:09:53 +02:00
|
|
|
$this->signalHandling = \extension_loaded("pcntl");
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
protected function dispatch($blocking) {
|
|
|
|
$this->selectStreams(
|
|
|
|
$this->readStreams,
|
|
|
|
$this->writeStreams,
|
|
|
|
$blocking ? $this->getTimeout() : 0
|
|
|
|
);
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
if (!empty($this->timerExpires)) {
|
|
|
|
$time = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
while (!$this->timerQueue->isEmpty()) {
|
|
|
|
list($watcher, $expiration) = $this->timerQueue->top();
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
$id = $watcher->id;
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
|
|
|
|
$this->timerQueue->extract(); // Timer was removed from queue.
|
|
|
|
continue;
|
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
if ($this->timerExpires[$id] > $time) { // Timer at top of queue has not expired.
|
2016-08-14 22:29:00 +02:00
|
|
|
break;
|
2016-06-08 16:12:42 +02:00
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
$this->timerQueue->extract();
|
2016-06-07 19:24:53 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
if ($watcher->type & Watcher::REPEAT) {
|
|
|
|
$this->activate([$watcher]);
|
|
|
|
} else {
|
|
|
|
$this->cancel($id);
|
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
// Execute the timer.
|
|
|
|
$callback = $watcher->callback;
|
|
|
|
$callback($id, $watcher->data);
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
2016-06-08 16:12:42 +02:00
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
if ($this->signalHandling) {
|
|
|
|
\pcntl_signal_dispatch();
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param resource[] $read
|
|
|
|
* @param resource[] $write
|
|
|
|
* @param int $timeout
|
|
|
|
*/
|
|
|
|
private function selectStreams(array $read, array $write, $timeout) {
|
|
|
|
$timeout /= self::MILLISEC_PER_SEC;
|
|
|
|
|
|
|
|
if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop.
|
2016-05-23 05:59:09 +02:00
|
|
|
if ($timeout >= 0) {
|
2016-05-19 18:12:35 +02:00
|
|
|
$seconds = (int) $timeout;
|
|
|
|
$microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC);
|
|
|
|
} else {
|
|
|
|
$seconds = null;
|
|
|
|
$microseconds = null;
|
|
|
|
}
|
|
|
|
|
|
|
|
$except = null;
|
|
|
|
|
|
|
|
// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
|
2016-05-23 06:08:31 +02:00
|
|
|
$count = @\stream_select($read, $write, $except, $seconds, $microseconds);
|
2016-05-19 18:12:35 +02:00
|
|
|
|
|
|
|
if ($count) {
|
|
|
|
foreach ($read as $stream) {
|
2016-06-02 16:45:04 +02:00
|
|
|
$streamId = (int) $stream;
|
|
|
|
if (isset($this->readWatchers[$streamId])) {
|
|
|
|
foreach ($this->readWatchers[$streamId] as $watcher) {
|
2016-06-07 19:24:53 +02:00
|
|
|
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
|
|
|
|
continue; // Watcher disabled by another IO watcher.
|
|
|
|
}
|
|
|
|
|
2016-06-02 16:45:04 +02:00
|
|
|
$callback = $watcher->callback;
|
|
|
|
$callback($watcher->id, $stream, $watcher->data);
|
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
foreach ($write as $stream) {
|
2016-06-02 16:45:04 +02:00
|
|
|
$streamId = (int) $stream;
|
|
|
|
if (isset($this->writeWatchers[$streamId])) {
|
|
|
|
foreach ($this->writeWatchers[$streamId] as $watcher) {
|
2016-06-07 19:24:53 +02:00
|
|
|
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
|
|
|
|
continue; // Watcher disabled by another IO watcher.
|
|
|
|
}
|
|
|
|
|
2016-06-02 16:45:04 +02:00
|
|
|
$callback = $watcher->callback;
|
|
|
|
$callback($watcher->id, $stream, $watcher->data);
|
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2016-05-23 05:59:09 +02:00
|
|
|
if ($timeout > 0) { // Otherwise sleep with usleep() if $timeout > 0.
|
2016-05-23 06:08:31 +02:00
|
|
|
\usleep($timeout * self::MICROSEC_PER_SEC);
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @return int Milliseconds until next timer expires or -1 if there are no pending times.
|
|
|
|
*/
|
|
|
|
private function getTimeout() {
|
|
|
|
while (!$this->timerQueue->isEmpty()) {
|
2016-06-07 19:24:53 +02:00
|
|
|
list($watcher, $expiration) = $this->timerQueue->top();
|
2016-06-03 17:00:29 +02:00
|
|
|
|
|
|
|
$id = $watcher->id;
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
|
2016-05-19 18:12:35 +02:00
|
|
|
$this->timerQueue->extract(); // Timer was removed from queue.
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
$expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC);
|
2016-05-19 18:12:35 +02:00
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
if ($expiration < 0) {
|
2016-05-19 18:12:35 +02:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
return $expiration;
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2016-06-08 16:12:42 +02:00
|
|
|
* {@inheritdoc}
|
|
|
|
*
|
|
|
|
* @throws \Interop\Async\Loop\UnsupportedFeatureException If the pcntl extension is not available.
|
|
|
|
* @throws \RuntimeException If creating the backend signal handler fails.
|
2016-05-19 18:12:35 +02:00
|
|
|
*/
|
2016-06-08 16:12:42 +02:00
|
|
|
public function onSignal($signo, callable $callback, $data = null) {
|
|
|
|
if (!$this->signalHandling) {
|
|
|
|
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
2016-06-07 19:40:35 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
return parent::onSignal($signo, $callback, $data);
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
/**
|
2016-06-08 16:12:42 +02:00
|
|
|
* {@inheritdoc}
|
2016-06-07 19:24:53 +02:00
|
|
|
*/
|
2016-06-08 16:12:42 +02:00
|
|
|
protected function activate(array $watchers) {
|
|
|
|
foreach ($watchers as $watcher) {
|
2016-06-07 19:24:53 +02:00
|
|
|
switch ($watcher->type) {
|
|
|
|
case Watcher::READABLE:
|
|
|
|
$streamId = (int) $watcher->value;
|
|
|
|
$this->readWatchers[$streamId][$watcher->id] = $watcher;
|
|
|
|
$this->readStreams[$streamId] = $watcher->value;
|
|
|
|
break;
|
2016-06-08 16:12:42 +02:00
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
case Watcher::WRITABLE:
|
|
|
|
$streamId = (int) $watcher->value;
|
|
|
|
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
|
|
|
|
$this->writeStreams[$streamId] = $watcher->value;
|
|
|
|
break;
|
2016-06-08 16:12:42 +02:00
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
case Watcher::DELAY:
|
|
|
|
case Watcher::REPEAT:
|
2016-07-25 17:09:40 +02:00
|
|
|
$expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value;
|
2016-06-07 19:24:53 +02:00
|
|
|
$this->timerExpires[$watcher->id] = $expiration;
|
2016-07-25 17:09:40 +02:00
|
|
|
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
|
2016-06-07 19:24:53 +02:00
|
|
|
break;
|
2016-06-08 16:12:42 +02:00
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
case Watcher::SIGNAL:
|
|
|
|
if (!isset($this->signalWatchers[$watcher->value])) {
|
2016-12-29 21:40:12 +01:00
|
|
|
if (!@\pcntl_signal($watcher->value, [$this, 'handleSignal'])) {
|
2016-06-07 19:24:53 +02:00
|
|
|
throw new \RuntimeException("Failed to register signal handler");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
|
|
|
|
break;
|
2016-05-26 06:09:53 +02:00
|
|
|
|
2016-06-08 16:12:42 +02:00
|
|
|
default:
|
|
|
|
throw new \DomainException("Unknown watcher type");
|
|
|
|
}
|
2016-06-07 07:18:59 +02:00
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2016-06-08 16:12:42 +02:00
|
|
|
public function deactivate(Watcher $watcher) {
|
2016-05-26 06:09:53 +02:00
|
|
|
switch ($watcher->type) {
|
|
|
|
case Watcher::READABLE:
|
2016-06-02 16:45:04 +02:00
|
|
|
$streamId = (int) $watcher->value;
|
2016-06-08 16:12:42 +02:00
|
|
|
unset($this->readWatchers[$streamId][$watcher->id]);
|
2016-06-02 16:45:04 +02:00
|
|
|
if (empty($this->readWatchers[$streamId])) {
|
2016-06-03 17:00:29 +02:00
|
|
|
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
|
2016-06-02 16:45:04 +02:00
|
|
|
}
|
2016-05-26 06:09:53 +02:00
|
|
|
break;
|
|
|
|
|
|
|
|
case Watcher::WRITABLE:
|
2016-06-02 16:45:04 +02:00
|
|
|
$streamId = (int) $watcher->value;
|
2016-06-08 16:12:42 +02:00
|
|
|
unset($this->writeWatchers[$streamId][$watcher->id]);
|
2016-06-02 16:45:04 +02:00
|
|
|
if (empty($this->writeWatchers[$streamId])) {
|
|
|
|
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
|
|
|
|
}
|
2016-05-26 06:09:53 +02:00
|
|
|
break;
|
|
|
|
|
|
|
|
case Watcher::DELAY:
|
|
|
|
case Watcher::REPEAT:
|
2016-06-08 16:12:42 +02:00
|
|
|
unset($this->timerExpires[$watcher->id]);
|
2016-05-26 06:09:53 +02:00
|
|
|
break;
|
|
|
|
|
|
|
|
case Watcher::SIGNAL:
|
2016-06-07 19:24:53 +02:00
|
|
|
if (isset($this->signalWatchers[$watcher->value])) {
|
2016-06-08 16:12:42 +02:00
|
|
|
unset($this->signalWatchers[$watcher->value][$watcher->id]);
|
2016-06-07 19:24:53 +02:00
|
|
|
|
|
|
|
if (empty($this->signalWatchers[$watcher->value])) {
|
|
|
|
unset($this->signalWatchers[$watcher->value]);
|
|
|
|
@\pcntl_signal($watcher->value, \SIG_DFL);
|
|
|
|
}
|
|
|
|
}
|
2016-05-26 06:09:53 +02:00
|
|
|
break;
|
2016-06-03 17:00:29 +02:00
|
|
|
|
2016-06-07 19:24:53 +02:00
|
|
|
default: throw new \DomainException("Unknown watcher type");
|
2016-05-19 18:12:35 +02:00
|
|
|
}
|
|
|
|
}
|
2016-12-29 21:40:12 +01:00
|
|
|
|
|
|
|
/**
|
|
|
|
* @param int $signo
|
|
|
|
*/
|
|
|
|
private function handleSignal($signo) {
|
|
|
|
foreach ($this->signalWatchers[$signo] as $watcher) {
|
|
|
|
if (!isset($this->signalWatchers[$signo][$watcher->id])) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
$callback = $watcher->callback;
|
|
|
|
$callback($watcher->id, $signo, $watcher->data);
|
|
|
|
}
|
|
|
|
}
|
2016-05-19 18:12:35 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2016-05-23 05:59:09 +02:00
|
|
|
public function getHandle() {
|
2016-05-19 18:12:35 +02:00
|
|
|
return null;
|
|
|
|
}
|
2016-06-04 00:10:47 +02:00
|
|
|
}
|