1
0
mirror of https://github.com/danog/amp.git synced 2024-12-11 17:09:40 +01:00
amp/lib/Loop/NativeDriver.php

360 lines
11 KiB
PHP
Raw Normal View History

2016-05-19 18:12:35 +02:00
<?php
namespace Amp\Loop;
use Amp\Promise;
use function Amp\Internal\getCurrentTime;
2016-05-19 18:12:35 +02:00
2020-11-01 16:43:21 +01:00
final class NativeDriver extends DriverFoundation
2018-06-18 20:00:01 +02:00
{
2017-02-17 05:36:32 +01:00
/** @var resource[] */
private array $readStreams = [];
2016-05-19 18:12:35 +02:00
/** @var Watcher[][] */
private array $readWatchers = [];
2016-05-19 18:12:35 +02:00
2017-02-17 05:36:32 +01:00
/** @var resource[] */
private array $writeStreams = [];
2016-05-19 18:12:35 +02:00
/** @var Watcher[][] */
private array $writeWatchers = [];
2016-05-19 18:12:35 +02:00
private Internal\TimerQueue $timerQueue;
2016-05-19 18:12:35 +02:00
/** @var Watcher[][] */
private array $signalWatchers = [];
2016-05-19 18:12:35 +02:00
/** @var int Internal timestamp for now. */
private int $now;
2018-01-06 03:32:57 +01:00
/** @var int Loop time offset */
private int $nowOffset;
private bool $signalHandling;
2016-05-19 18:12:35 +02:00
2018-06-18 20:00:01 +02:00
public function __construct()
{
$this->timerQueue = new Internal\TimerQueue;
2016-05-26 06:09:53 +02:00
$this->signalHandling = \extension_loaded("pcntl");
$this->nowOffset = getCurrentTime();
2018-11-26 17:33:03 +01:00
$this->now = \random_int(0, $this->nowOffset);
$this->nowOffset -= $this->now;
2016-05-19 18:12:35 +02:00
}
/**
* {@inheritdoc}
*
* @throws \Amp\Loop\UnsupportedFeatureException If the pcntl extension is not available.
*/
2018-06-18 20:00:01 +02:00
public function onSignal(int $signo, callable $callback, $data = null): string
{
if (!$this->signalHandling) {
throw new UnsupportedFeatureException("Signal handling requires the pcntl extension");
}
return parent::onSignal($signo, $callback, $data);
}
2018-01-06 03:32:57 +01:00
/**
* {@inheritdoc}
*/
public function now(): int
{
$this->now = getCurrentTime() - $this->nowOffset;
2018-01-06 03:32:57 +01:00
return $this->now;
}
/**
* {@inheritdoc}
*/
2018-06-18 20:00:01 +02:00
public function getHandle()
{
return null;
}
/**
* @param bool $blocking
*
* @return void
*
* @throws \Throwable
*/
2020-09-24 18:52:22 +02:00
protected function dispatch(bool $blocking): void
2018-06-18 20:00:01 +02:00
{
$this->selectStreams(
$this->readStreams,
$this->writeStreams,
$blocking ? $this->getTimeout() : 0
);
2016-05-19 18:12:35 +02:00
$now = $this->now();
2016-05-19 18:12:35 +02:00
while ($watcher = $this->timerQueue->extract($now)) {
if ($watcher->type & Watcher::REPEAT) {
$watcher->enabled = false; // Trick base class into adding to enable queue when calling enable()
$this->enable($watcher->id);
} else {
$this->cancel($watcher->id);
}
try {
// Execute the timer.
$result = ($watcher->callback)($watcher->id, $watcher->data);
if ($result instanceof Promise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
2016-05-19 18:12:35 +02:00
}
}
2016-05-19 18:12:35 +02:00
if ($this->signalHandling) {
\pcntl_signal_dispatch();
2016-05-19 18:12:35 +02:00
}
}
/**
* @param resource[] $read
* @param resource[] $write
* @param int $timeout
*
* @return void
2016-05-19 18:12:35 +02:00
*/
2020-09-24 18:52:22 +02:00
private function selectStreams(array $read, array $write, int $timeout): void
2018-06-18 20:00:01 +02:00
{
2016-05-19 18:12:35 +02:00
$timeout /= self::MILLISEC_PER_SEC;
if (!empty($read) || !empty($write)) { // Use stream_select() if there are any streams in the loop.
if ($timeout >= 0) {
2016-05-19 18:12:35 +02:00
$seconds = (int) $timeout;
$microseconds = (int) (($timeout - $seconds) * self::MICROSEC_PER_SEC);
} else {
$seconds = null;
$microseconds = null;
}
$except = null;
// Error reporting suppressed since stream_select() emits an E_WARNING if it is interrupted by a signal.
if (!($result = @\stream_select($read, $write, $except, $seconds, $microseconds))) {
if ($result === 0) {
return;
}
$error = \error_get_last();
2020-02-10 19:10:57 +01:00
if (\strpos($error["message"] ?? '', "unable to select") !== 0) {
return;
}
$this->error(new \Exception($error["message"] ?? 'Unknown error during stream_select'));
}
foreach ($read as $stream) {
$streamId = (int) $stream;
if (!isset($this->readWatchers[$streamId])) {
continue; // All read watchers disabled.
}
foreach ($this->readWatchers[$streamId] as $watcher) {
if (!isset($this->readWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
try {
$result = ($watcher->callback)($watcher->id, $stream, $watcher->data);
if ($result instanceof Promise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
2016-05-19 18:12:35 +02:00
}
}
}
2016-05-19 18:12:35 +02:00
2020-03-28 21:55:44 +01:00
\assert(\is_array($write)); // See https://github.com/vimeo/psalm/issues/3036
foreach ($write as $stream) {
$streamId = (int) $stream;
if (!isset($this->writeWatchers[$streamId])) {
continue; // All write watchers disabled.
}
foreach ($this->writeWatchers[$streamId] as $watcher) {
if (!isset($this->writeWatchers[$streamId][$watcher->id])) {
continue; // Watcher disabled by another IO watcher.
}
try {
$result = ($watcher->callback)($watcher->id, $stream, $watcher->data);
if ($result instanceof Promise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
2016-05-19 18:12:35 +02:00
}
}
}
return;
}
if ($timeout < 0) { // Only signal watchers are enabled, so sleep indefinitely.
\usleep(\PHP_INT_MAX);
return;
}
if ($timeout > 0) { // Sleep until next timer expires.
\usleep((int) ($timeout * self::MICROSEC_PER_SEC));
2016-05-19 18:12:35 +02:00
}
}
/**
* @return int Milliseconds until next timer expires or -1 if there are no pending times.
*/
private function getTimeout(): int
2018-06-18 20:00:01 +02:00
{
$expiration = $this->timerQueue->peek();
2016-05-19 18:12:35 +02:00
if ($expiration === null) {
return -1;
2016-05-19 18:12:35 +02:00
}
$expiration -= getCurrentTime() - $this->nowOffset;
return $expiration > 0 ? $expiration : 0;
2016-05-19 18:12:35 +02:00
}
2016-06-07 19:24:53 +02:00
/**
* {@inheritdoc}
*
* @return void
2016-06-07 19:24:53 +02:00
*/
2020-09-24 18:52:22 +02:00
protected function activate(array $watchers): void
2018-06-18 20:00:01 +02:00
{
foreach ($watchers as $watcher) {
2016-06-07 19:24:53 +02:00
switch ($watcher->type) {
case Watcher::READABLE:
\assert(\is_resource($watcher->value));
2016-06-07 19:24:53 +02:00
$streamId = (int) $watcher->value;
$this->readWatchers[$streamId][$watcher->id] = $watcher;
$this->readStreams[$streamId] = $watcher->value;
break;
2016-06-07 19:24:53 +02:00
case Watcher::WRITABLE:
\assert(\is_resource($watcher->value));
2016-06-07 19:24:53 +02:00
$streamId = (int) $watcher->value;
$this->writeWatchers[$streamId][$watcher->id] = $watcher;
$this->writeStreams[$streamId] = $watcher->value;
break;
2016-06-07 19:24:53 +02:00
case Watcher::DELAY:
case Watcher::REPEAT:
\assert(\is_int($watcher->value));
$this->timerQueue->insert($watcher);
2016-06-07 19:24:53 +02:00
break;
2016-06-07 19:24:53 +02:00
case Watcher::SIGNAL:
\assert(\is_int($watcher->value));
2016-06-07 19:24:53 +02:00
if (!isset($this->signalWatchers[$watcher->value])) {
2020-09-27 05:27:47 +02:00
if (!@\pcntl_signal($watcher->value, \Closure::fromCallable([$this, 'handleSignal']))) {
$message = "Failed to register signal handler";
if ($error = \error_get_last()) {
$message .= \sprintf("; Errno: %d; %s", $error["type"], $error["message"]);
}
throw new \Error($message);
2016-06-07 19:24:53 +02:00
}
}
$this->signalWatchers[$watcher->value][$watcher->id] = $watcher;
break;
2016-05-26 06:09:53 +02:00
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
2018-06-18 20:00:01 +02:00
// @codeCoverageIgnoreEnd
}
}
2016-05-19 18:12:35 +02:00
}
/**
* {@inheritdoc}
*
* @return void
2016-05-19 18:12:35 +02:00
*/
2020-09-24 18:52:22 +02:00
protected function deactivate(Watcher $watcher): void
2018-06-18 20:00:01 +02:00
{
2016-05-26 06:09:53 +02:00
switch ($watcher->type) {
case Watcher::READABLE:
$streamId = (int) $watcher->value;
unset($this->readWatchers[$streamId][$watcher->id]);
if (empty($this->readWatchers[$streamId])) {
unset($this->readWatchers[$streamId], $this->readStreams[$streamId]);
}
2016-05-26 06:09:53 +02:00
break;
case Watcher::WRITABLE:
$streamId = (int) $watcher->value;
unset($this->writeWatchers[$streamId][$watcher->id]);
if (empty($this->writeWatchers[$streamId])) {
unset($this->writeWatchers[$streamId], $this->writeStreams[$streamId]);
}
2016-05-26 06:09:53 +02:00
break;
case Watcher::DELAY:
case Watcher::REPEAT:
$this->timerQueue->remove($watcher);
2016-05-26 06:09:53 +02:00
break;
case Watcher::SIGNAL:
2020-04-19 15:38:22 +02:00
\assert(\is_int($watcher->value));
2016-06-07 19:24:53 +02:00
if (isset($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value][$watcher->id]);
2016-06-07 19:24:53 +02:00
if (empty($this->signalWatchers[$watcher->value])) {
unset($this->signalWatchers[$watcher->value]);
@\pcntl_signal($watcher->value, \SIG_DFL);
}
}
2016-05-26 06:09:53 +02:00
break;
default:
// @codeCoverageIgnoreStart
throw new \Error("Unknown watcher type");
2018-06-18 20:00:01 +02:00
// @codeCoverageIgnoreEnd
2016-05-19 18:12:35 +02:00
}
}
2017-01-07 13:45:03 +01:00
2016-12-29 21:40:12 +01:00
/**
* @param int $signo
*
* @return void
2016-12-29 21:40:12 +01:00
*/
2020-09-24 18:52:22 +02:00
private function handleSignal(int $signo): void
2018-06-18 20:00:01 +02:00
{
2016-12-29 21:40:12 +01:00
foreach ($this->signalWatchers[$signo] as $watcher) {
if (!isset($this->signalWatchers[$signo][$watcher->id])) {
continue;
}
2017-01-07 13:45:03 +01:00
try {
$result = ($watcher->callback)($watcher->id, $signo, $watcher->data);
if ($result instanceof Promise) {
Promise\rethrow($result);
}
} catch (\Throwable $exception) {
$this->error($exception);
}
2016-12-29 21:40:12 +01:00
}
}
}