1
0
mirror of https://github.com/danog/amp.git synced 2024-12-13 01:47:33 +01:00
amp/lib/Loop/NativeLoop.php

319 lines
10 KiB
PHP
Raw Normal View History

2016-05-19 18:12:35 +02:00
<?php
namespace Amp\Loop;
use Amp\Coroutine;
use Amp\Promise;
use Amp\Internal\Watcher;
use function Amp\rethrow;
2016-05-19 18:12:35 +02:00
2017-03-10 19:19:32 +01:00
class NativeLoop extends Driver {
2017-02-17 05:36:32 +01:00
/** @var resource[] */
2016-05-19 18:12:35 +02:00
private $readStreams = [];
/** @var \Amp\Internal\Watcher[][] */
2016-05-19 18:12:35 +02:00
private $readWatchers = [];
2017-02-17 05:36:32 +01:00
/** @var resource[] */
2016-05-19 18:12:35 +02:00
private $writeStreams = [];
/** @var \Amp\Internal\Watcher[][] */
2016-05-19 18:12:35 +02:00
private $writeWatchers = [];
2017-02-17 05:36:32 +01:00
/** @var int[] */
2016-05-19 18:12:35 +02:00
private $timerExpires = [];
2017-02-17 05:36:32 +01:00
/** @var \SplPriorityQueue */
2016-05-19 18:12:35 +02:00
private $timerQueue;
/** @var \Amp\Internal\Watcher[][] */
2016-05-19 18:12:35 +02:00
private $signalWatchers = [];
2017-02-17 05:36:32 +01:00
/** @var bool */
2016-05-19 18:12:35 +02:00
private $signalHandling;
public function __construct() {
$this->timerQueue = new \SplPriorityQueue();
2016-05-26 06:09:53 +02:00
$this->signalHandling = \extension_loaded("pcntl");
2016-05-19 18:12:35 +02:00
}
/**
* {@inheritdoc}
*
* @throws \Amp\Loop\UnsupportedFeatureException If the pcntl extension is not available.
* @throws \RuntimeException If creating the backend signal handler fails.
*/
public function onSignal(int $signo, callable $callback, $data = null): string {
if (!$this->signalHandling) {
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
}
return parent::onSignal($signo, $callback, $data);
}
/**
* {@inheritdoc}
*/
public function getHandle() {
return null;
}
protected function dispatch(bool $blocking) {
$this->selectStreams(
$this->readStreams,
$this->writeStreams,
$blocking ? $this->getTimeout() : 0
);
2016-05-19 18:12:35 +02:00
if (!empty($this->timerExpires)) {
$time = (int) (\microtime(true) * self::MILLISEC_PER_SEC);
2016-05-19 18:12:35 +02:00
while (!$this->timerQueue->isEmpty()) {
list($watcher, $expiration) = $this->timerQueue->top();
2016-05-19 18:12:35 +02:00
$id = $watcher->id;
2016-05-19 18:12:35 +02:00
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}
2016-05-19 18:12:35 +02:00
if ($this->timerExpires[$id] > $time) { // Timer at top of queue has not expired.
break;
}
2016-05-19 18:12:35 +02:00
$this->timerQueue->extract();
2016-06-07 19:24:53 +02:00
if ($watcher->type & Watcher::REPEAT) {
$this->activate([$watcher]);
} else {
$this->cancel($id);
}
2016-05-19 18:12:35 +02:00
// Execute the timer.
$callback = $watcher->callback;
$result = $callback($id, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
2016-05-19 18:12:35 +02:00
}
}
2016-05-19 18:12:35 +02:00
if ($this->signalHandling) {
\pcntl_signal_dispatch();
2016-05-19 18:12:35 +02:00
}
}
/**
* @param resource[] $read
* @param resource[] $write
* @param int $timeout
2016-05-19 18:12:35 +02:00
*/
private function selectStreams(array $read, array $write, $timeout) {
$timeout /= self::MILLISEC_PER_SEC;
if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop.
if ($timeout >= 0) {
2016-05-19 18:12:35 +02:00
$seconds = (int) $timeout;
$microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC);
} else {
$seconds = null;
$microseconds = null;
}
$except = null;
// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
$count = @\stream_select($read, $write, $except, $seconds, $microseconds);
2016-05-19 18:12:35 +02:00
if ($count) {
foreach ($read as $stream) {
$streamId = (int) $stream;
if (isset($this->readWatchers[$streamId])) {
foreach ($this->readWatchers[$streamId] as $watcher) {
2016-06-07 19:24:53 +02:00
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $stream, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
2016-05-19 18:12:35 +02:00
}
}
foreach ($write as $stream) {
$streamId = (int) $stream;
if (isset($this->writeWatchers[$streamId])) {
foreach ($this->writeWatchers[$streamId] as $watcher) {
2016-06-07 19:24:53 +02:00
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
$callback = $watcher->callback;
$result = $callback($watcher->id, $stream, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
}
2016-05-19 18:12:35 +02:00
}
}
}
return;
}
if ($timeout > 0) { // Otherwise sleep with usleep() if $timeout > 0.
\usleep($timeout * self::MICROSEC_PER_SEC);
2016-05-19 18:12:35 +02:00
}
}
/**
* @return int Milliseconds until next timer expires or -1 if there are no pending times.
*/
private function getTimeout() {
while (!$this->timerQueue->isEmpty()) {
2016-06-07 19:24:53 +02:00
list($watcher, $expiration) = $this->timerQueue->top();
$id = $watcher->id;
2016-05-19 18:12:35 +02:00
2016-06-07 19:24:53 +02:00
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
2016-05-19 18:12:35 +02:00
$this->timerQueue->extract(); // Timer was removed from queue.
continue;
}
2016-06-07 19:24:53 +02:00
$expiration -= (int) (\microtime(true) * self::MILLISEC_PER_SEC);
2016-05-19 18:12:35 +02:00
2016-06-07 19:24:53 +02:00
if ($expiration < 0) {
2016-05-19 18:12:35 +02:00
return 0;
}
2016-06-07 19:24:53 +02:00
return $expiration;
2016-05-19 18:12:35 +02:00
}
return -1;
}
2016-06-07 19:24:53 +02:00
/**
* {@inheritdoc}
2016-06-07 19:24:53 +02:00
*/
protected function activate(array $watchers) {
foreach ($watchers as $watcher) {
2016-06-07 19:24:53 +02:00
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher;
$this->readStreams[$streamId] = $watcher->value;
break;
2016-06-07 19:24:53 +02:00
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
$this->writeStreams[$streamId] = $watcher->value;
break;
2016-06-07 19:24:53 +02:00
case Watcher::DELAY:
case Watcher::REPEAT:
2016-07-25 17:09:40 +02:00
$expiration = (int) (\microtime(true) * self::MILLISEC_PER_SEC) + $watcher->value;
2016-06-07 19:24:53 +02:00
$this->timerExpires[$watcher->id] = $expiration;
2016-07-25 17:09:40 +02:00
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
2016-06-07 19:24:53 +02:00
break;
2016-06-07 19:24:53 +02:00
case Watcher::SIGNAL:
if (!isset($this->signalWatchers[$watcher->value])) {
2016-12-29 21:40:12 +01:00
if (!@\pcntl_signal($watcher->value, [$this, 'handleSignal'])) {
2016-06-07 19:24:53 +02:00
throw new \RuntimeException("Failed to register signal handler");
}
}
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
break;
2016-05-26 06:09:53 +02:00
default:
throw new \DomainException("Unknown watcher type");
}
}
2016-05-19 18:12:35 +02:00
}
/**
* {@inheritdoc}
*/
2017-01-05 19:39:10 +01:00
protected function deactivate(Watcher $watcher) {
2016-05-26 06:09:53 +02:00
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
unset($this->readWatchers[$streamId][$watcher->id]);
if (empty($this->readWatchers[$streamId])) {
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
}
2016-05-26 06:09:53 +02:00
break;
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
unset($this->writeWatchers[$streamId][$watcher->id]);
if (empty($this->writeWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
}
2016-05-26 06:09:53 +02:00
break;
case Watcher::DELAY:
case Watcher::REPEAT:
unset($this->timerExpires[$watcher->id]);
2016-05-26 06:09:53 +02:00
break;
case Watcher::SIGNAL:
2016-06-07 19:24:53 +02:00
if (isset($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value][$watcher->id]);
2016-06-07 19:24:53 +02:00
if (empty($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value]);
@\pcntl_signal($watcher->value, \SIG_DFL);
}
}
2016-05-26 06:09:53 +02:00
break;
default:
throw new \DomainException("Unknown watcher type");
2016-05-19 18:12:35 +02:00
}
}
2017-01-07 13:45:03 +01:00
2016-12-29 21:40:12 +01:00
/**
* @param int $signo
*/
private function handleSignal(int $signo) {
2016-12-29 21:40:12 +01:00
foreach ($this->signalWatchers[$signo] as $watcher) {
if (!isset($this->signalWatchers[$signo][$watcher->id])) {
continue;
}
2017-01-07 13:45:03 +01:00
2016-12-29 21:40:12 +01:00
$callback = $watcher->callback;
$result = $callback($watcher->id, $signo, $watcher->data);
if ($result instanceof \Generator) {
$result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
}
2016-12-29 21:40:12 +01:00
}
}
}