1
0
mirror of https://github.com/danog/amp.git synced 2025-01-23 05:41:25 +01:00
amp/lib/UvReactor.php

697 lines
22 KiB
PHP
Raw Normal View History

2014-06-11 12:33:15 -04:00
<?php
2014-09-22 22:38:32 -04:00
namespace Amp;
2014-06-11 12:33:15 -04:00
2015-03-19 11:14:21 -04:00
class UvReactor implements SignalReactor {
2014-06-11 12:33:15 -04:00
private $loop;
2015-03-19 11:14:21 -04:00
private $lastWatcherId = "a";
private $watchers;
private $enabledWatcherCount = 0;
private $streamIdPollMap = [];
private $isRunning = false;
2014-06-11 12:33:15 -04:00
private $stopException;
private $resolution = 1000;
private $isWindows;
private $immediates = [];
private $onError;
private $onCoroutineResolution;
2014-06-11 12:33:15 -04:00
private static $instanceCount = 0;
2014-11-11 12:00:01 -05:00
public function __construct() {
if (!extension_loaded('uv')) {
throw new \RuntimeException('php-uv extension is required to use the UvReactor.');
}
$this->loop = uv_loop_new();
$this->isWindows = (stripos(PHP_OS, 'win') === 0);
$this->onCoroutineResolution = function($e = null, $r = null) {
if (empty($e)) {
return;
} elseif ($onError = $this->onError) {
$onError($e);
} else {
2014-09-22 16:47:48 -04:00
throw $e;
}
};
self::$instanceCount++;
2014-06-11 12:33:15 -04:00
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
* @throws \Exception Will throw if code executed during the event loop throws
2014-06-11 12:33:15 -04:00
*/
public function run(callable $onStart = null) {
2014-06-11 12:33:15 -04:00
if ($this->isRunning) {
return;
}
2014-08-06 00:17:03 -04:00
$this->isRunning = true;
if ($onStart) {
$this->immediately($onStart);
}
while ($this->isRunning) {
if ($this->immediates && !$this->doImmediates()) {
break;
}
if (empty($this->enabledWatcherCount)) {
break;
}
uv_run($this->loop, \UV::RUN_DEFAULT | (empty($this->immediates) ? \UV::RUN_ONCE : \UV::RUN_NOWAIT));
}
2014-06-11 12:33:15 -04:00
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
2014-06-11 12:33:15 -04:00
throw $e;
}
}
private function doImmediates() {
$immediates = $this->immediates;
foreach ($immediates as $watcherId => $watcher) {
try {
$this->enabledWatcherCount--;
unset(
$this->immediates[$watcherId],
$this->watchers[$watcherId]
);
$result = ($watcher->callback)($this, $watcherId, $watcher->callbackData);
if ($result instanceof \Generator) {
resolve($result, $this)->when($this->onCoroutineResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
2014-09-22 16:47:48 -04:00
}
if (!$this->isRunning) {
// If a watcher stops the reactor break out of the loop
return false;
}
}
return $this->isRunning;
}
2014-06-11 12:33:15 -04:00
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
2015-03-19 11:14:21 -04:00
public function tick(bool $noWait = false) {
if ($this->isRunning) {
return;
}
2014-08-06 00:17:03 -04:00
$this->isRunning = true;
if (empty($this->immediates) || $this->doImmediates()) {
$flags = $noWait || !empty($this->immediates) ? (\UV::RUN_NOWAIT | \UV::RUN_ONCE) : \UV::RUN_ONCE;
2014-11-26 13:42:36 -05:00
uv_run($this->loop, $flags);
}
$this->isRunning = false;
2014-06-11 12:33:15 -04:00
if ($this->stopException) {
$e = $this->stopException;
$this->stopException = null;
2014-06-11 12:33:15 -04:00
throw $e;
}
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
public function stop() {
uv_stop($this->loop);
$this->isRunning = false;
2014-06-11 12:33:15 -04:00
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
public function immediately(callable $callback, array $options = []): string {
// @TODO Replace stdclass with anon class once merged into php-src/master
$watcher = new \StdClass;
/*
$watcher = new class extends Watcher {
// Inherited:
// public $id;
// public $type;
// public $isEnabled;
// public $callback;
// public $callbackData;
public $msDelay;
public $nextExecutionAt;
}
*/
$watcher->id = $watcherId = $this->lastWatcherId++;
$watcher->type = Watcher::IMMEDIATE;
$watcher->callback = $callback;
$watcher->callbackData = $options["callbackData"] ?? null;
$watcher->isEnabled = $options["enable"] ?? true;
if ($watcher->isEnabled) {
$this->enabledWatcherCount++;
$this->immediates[$watcherId] = $watcher;
}
$this->watchers[$watcherId] = $watcher;
return $watcherId;
2014-06-11 12:33:15 -04:00
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
public function once(callable $callback, int $msDelay, array $options = []): string {
assert(($msDelay >= 0), "\$msDelay at Argument 2 expects integer >= 0");
return $this->registerTimer($callback, $msDelay, $msInterval = -1, $options);
2014-06-11 12:33:15 -04:00
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
public function repeat(callable $callback, int $msInterval, array $options = []): string {
assert(($msInterval >= 0), "\$msInterval at Argument 2 expects integer >= 0");
$msDelay = $options["msDelay"] ?? $msInterval;
assert(($msDelay >= 0), "msDelay option expects integer >= 0");
// libuv interprets a zero interval as "non-repeating." Because we support
// zero-time repeat intervals in our other event reactors we hack in support
// for this by assigning a 1ms interval when zero is passed by the user.
if ($msInterval === 0) {
$msInterval = 1;
}
return $this->registerTimer($callback, $msDelay, $msInterval, $options);
2014-06-11 12:33:15 -04:00
}
private function registerTimer(callable $callback, int $msDelay, int $msInterval, array $options): string {
$this->enabledWatcherCount++;
// @TODO Replace stdclass with anon class once merged into php-src/master
$watcher = new \StdClass;
/*
$watcher = new class extends Watcher {
// Inherited:
// public $id;
// public $type;
// public $callback;
// public $callbackData;
// public $isEnabled;
public $uvHandle;
public $msDelay;
public $msInterval;
};
*/
$isRepeating = ($msInterval !== -1);
$watcher->id = $watcherId = $this->lastWatcherId++;
$watcher->type = ($isRepeating) ? Watcher::TIMER_ONCE : Watcher::TIMER_REPEAT;
$watcher->uvHandle = uv_timer_init($this->loop);
2014-06-11 12:33:15 -04:00
$watcher->callback = $this->wrapTimerCallback($watcher, $callback);
$watcher->callbackData = $options["callbackData"] ?? null;
$watcher->isEnabled = $options["enable"] ?? true;
$watcher->msDelay = $msDelay;
$watcher->msInterval = $isRepeating ? $msInterval : 0;
2014-06-11 12:33:15 -04:00
$this->watchers[$watcherId] = $watcher;
2014-06-11 12:33:15 -04:00
if ($watcher->isEnabled) {
uv_timer_start($watcher->uvHandle, $watcher->msDelay, $watcher->msInterval, $watcher->callback);
}
2014-06-11 12:33:15 -04:00
return $watcher->id;
}
2015-03-19 11:14:21 -04:00
private function wrapTimerCallback($watcher, $callback): \Closure {
2014-06-11 12:33:15 -04:00
return function() use ($watcher, $callback) {
try {
$watcherId = $watcher->id;
$result = ($callback)($this, $watcherId, $watcher->callbackData);
2014-09-22 16:47:48 -04:00
if ($result instanceof \Generator) {
resolve($result, $this)->when($this->onCoroutineResolution);
2014-09-22 16:47:48 -04:00
}
2015-01-18 20:13:08 -05:00
// The isset() check is necessary because the "once" timer
// callback may have cancelled itself when it was invoked.
if ($watcher->type === Watcher::TIMER_ONCE && isset($this->watchers[$watcherId])) {
$this->clearWatcher($watcherId);
2014-06-11 12:33:15 -04:00
}
} catch (\Exception $e) {
$this->handleRunError($e);
}
};
}
private function handleRunError(\Exception $e) {
try {
if (empty($this->onError)) {
2014-06-11 12:33:15 -04:00
$this->stopException = $e;
$this->stop();
} else {
($this->onCoroutineResolution)($e);
2014-06-11 12:33:15 -04:00
}
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
}
2014-06-11 12:33:15 -04:00
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
public function onReadable($stream, callable $callback, array $options = []): string {
return $this->watchStream($stream, $callback, Watcher::IO_READER, $options);
2014-06-11 12:33:15 -04:00
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
public function onWritable($stream, callable $callback, array $options = []): string {
return $this->watchStream($stream, $callback, Watcher::IO_WRITER, $options);
2014-06-11 12:33:15 -04:00
}
private function watchStream($stream, callable $callback, int $type, array $options): string {
// @TODO Replace stdclass with anon class once merged into php-src/master
$watcher = new \StdClass;
/*
$watcher = new class extends Watcher {
// Inherited:
// public $id;
// public $type;
// public $callback;
// public $callbackData;
// public $isEnabled;
public $poll;
public $stream;
public $streamId;
};
*/
$this->watchers[$watcherId] = $watcher;
$watcher->id = $watcherId = $this->lastWatcherId++;
$watcher->type = $type;
$watcher->callback = $callback;
$watcher->callbackData = $options["callbackData"] ?? null;
$watcher->isEnabled = $options["enable"] ?? true;
$watcher->stream = $stream;
$watcher->streamId = $streamId = (int) $stream;
$watcher->poll = $poll = isset($this->streamIdPollMap[$streamId])
? $this->streamIdPollMap[$streamId]
: $this->makePollHandle($stream);
if (!$watcher->isEnabled) {
$poll->disable[$watcherId] = $watcher;
// If the poll is disabled we don't need to do anything else
return $watcherId;
}
2014-06-11 12:33:15 -04:00
$this->enabledWatcherCount++;
if ($type === Watcher::IO_READER) {
$poll->readers[$watcherId] = $watcher;
2014-06-11 12:33:15 -04:00
} else {
$poll->writers[$watcherId] = $watcher;
2014-06-11 12:33:15 -04:00
}
2014-11-12 13:17:08 -05:00
$newFlags = 0;
if ($poll->readers) {
2014-11-12 13:17:08 -05:00
$newFlags |= \UV::READABLE;
}
if ($poll->writers) {
2014-11-12 13:17:08 -05:00
$newFlags |= \UV::WRITABLE;
}
2014-11-12 13:17:08 -05:00
if ($newFlags != $poll->flags) {
$poll->flags = $newFlags;
uv_poll_start($poll->handle, $newFlags, $poll->callback);
}
2014-06-11 12:33:15 -04:00
return $watcherId;
}
2014-06-11 12:33:15 -04:00
private function makePollHandle($stream) {
// Windows needs the socket-specific init function, so make sure we use
// it when dealing with tcp/ssl streams.
$pollInitFunc = $this->isWindows
? $this->chooseWindowsPollingFunction($stream)
: 'uv_poll_init';
2014-06-11 12:33:15 -04:00
$streamId = (int) $stream;
// @TODO Replace stdclass with anon class once merged into php-src/master
$poll = new \StdClass;
/*
$poll = new class {
use Struct;
public $flags;
public $handle;
public $callback;
public $readers = [];
public $writers = [];
public $disable = [];
};
*/
$this->streamIdPollMap[$streamId] = $poll;
$poll->flags = 0;
$poll->handle = $pollInitFunc($this->loop, $stream);
$poll->callback = function($uvHandle, $stat, $events) use ($poll) {
if ($events & \UV::READABLE) {
foreach ($poll->readers as $watcher) {
$this->invokePollWatcher($watcher);
}
}
if ($events & \UV::WRITABLE) {
foreach ($poll->writers as $watcher) {
$this->invokePollWatcher($watcher);
}
}
};
2014-06-11 12:33:15 -04:00
return $poll;
2014-06-11 12:33:15 -04:00
}
2015-03-19 11:14:21 -04:00
private function chooseWindowsPollingFunction($stream): string {
$streamType = stream_get_meta_data($stream)['stream_type'];
return ($streamType === 'tcp_socket/ssl' || $streamType === 'tcp_socket')
? 'uv_poll_init_socket'
: 'uv_poll_init';
}
private function invokePollWatcher($watcher) {
try {
$result = ($watcher->callback)($this, $watcher->id, $watcher->stream, $watcher->callbackData);
if ($result instanceof \Generator) {
resolve($result, $this)->when($this->onCoroutineResolution);
2014-06-11 12:33:15 -04:00
}
} catch (\Exception $e) {
$this->handleRunError($e);
}
2014-06-11 12:33:15 -04:00
}
2014-08-06 00:17:03 -04:00
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-08-06 00:17:03 -04:00
*/
public function onSignal(int $signo, callable $func, array $options = []): string {
// @TODO Replace stdclass with anon class once merged into php-src/master
$watcher = new \StdClass;
/*
$watcher = new class extends Watcher {
// Inherited:
// public $id;
// public $type;
// public $callback;
// public $callbackData;
// public $isEnabled;
public $signo;
public $uvHandle;
};
*/
$watcher->id = $watcherId = $this->lastWatcherId++;
$watcher->type = Watcher::SIGNAL;
2015-03-19 11:14:21 -04:00
$watcher->callback = $this->wrapSignalCallback($watcher, $func);
$watcher->callbackData = $options["callbackData"] ?? null;
$watcher->isEnabled = $options["enable"] ?? true;
$watcher->signo = $signo;
$watcher->uvHandle = uv_signal_init($this->loop);
2014-08-06 00:17:03 -04:00
if ($watcher->isEnabled) {
$this->enabledWatcherCount++;
uv_signal_start($watcher->uvHandle, $watcher->callback, $watcher->signo);
}
2014-08-06 00:17:03 -04:00
$this->watchers[$watcherId] = $watcher;
2014-08-06 00:17:03 -04:00
return $watcherId;
2014-08-06 00:17:03 -04:00
}
2015-03-19 11:14:21 -04:00
private function wrapSignalCallback($watcher, $callback): \Closure {
2014-08-06 00:17:03 -04:00
return function() use ($watcher, $callback) {
try {
$result = ($callback)($this, $watcher->id, $watcher->signo, $watcher->callbackData);
2014-09-22 16:47:48 -04:00
if ($result instanceof \Generator) {
resolve($result, $this)->when($this->onCoroutineResolution);
2014-09-22 16:47:48 -04:00
}
2014-08-06 00:17:03 -04:00
} catch (\Exception $e) {
$this->handleRunError($e);
2014-08-06 00:17:03 -04:00
}
};
}
2014-06-11 12:33:15 -04:00
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
2015-03-19 11:14:21 -04:00
public function cancel(string $watcherId) {
2014-06-11 12:33:15 -04:00
if (isset($this->watchers[$watcherId])) {
$this->clearWatcher($watcherId);
}
}
2015-03-19 11:14:21 -04:00
private function clearWatcher(string $watcherId) {
2014-06-11 12:33:15 -04:00
$watcher = $this->watchers[$watcherId];
unset($this->watchers[$watcherId]);
if ($watcher->isEnabled) {
$this->enabledWatcherCount--;
switch ($watcher->type) {
case Watcher::IO_READER:
// fallthrough
case Watcher::IO_WRITER:
$this->clearPollFromWatcher($watcher);
2014-08-06 00:17:03 -04:00
break;
case Watcher::SIGNAL:
uv_signal_stop($watcher->uvHandle);
2014-08-06 00:17:03 -04:00
break;
case Watcher::IMMEDIATE:
unset($this->immediates[$watcherId]);
break;
case Watcher::TIMER_ONCE:
// we don't have to actually stop once timers
break;
2014-08-06 00:17:03 -04:00
default:
uv_timer_stop($watcher->uvHandle);
2014-08-06 00:17:03 -04:00
break;
}
2014-06-11 12:33:15 -04:00
}
}
private function clearPollFromWatcher($watcher) {
$poll = $watcher->poll;
$watcherId = $watcher->id;
unset(
$poll->readers[$watcherId],
$poll->writers[$watcherId],
$poll->disable[$watcherId]
);
// If any watchers are still enabled for this stream we're finished here
$hasEnabledWatchers = ((int) $poll->readers) + ((int) $poll->writers);
if ($hasEnabledWatchers) {
return;
}
// Always stop polling if no enabled watchers remain
uv_poll_stop($poll->handle);
// If all watchers are disabled we can pull out here
$hasDisabledWatchers = (bool) $poll->disable;
if ($hasDisabledWatchers) {
return;
}
// Otherwise there are no watchers left for this poll and we should clear it
$streamId = (int) $watcher->stream;
unset($this->streamIdPollMap[$streamId]);
}
2014-06-11 12:33:15 -04:00
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
2015-03-19 11:14:21 -04:00
public function disable(string $watcherId) {
2014-06-11 12:33:15 -04:00
if (!isset($this->watchers[$watcherId])) {
return;
}
$watcher = $this->watchers[$watcherId];
2014-08-06 00:17:03 -04:00
if (!$watcher->isEnabled) {
return;
}
switch ($watcher->type) {
case Watcher::IO_READER:
// fallthrough
case Watcher::IO_WRITER:
$this->disablePollFromWatcher($watcher);
2014-08-06 00:17:03 -04:00
break;
case Watcher::SIGNAL:
uv_signal_stop($watcher->uvHandle);
2014-08-06 00:17:03 -04:00
break;
case Watcher::IMMEDIATE:
unset($this->immediates[$watcherId]);
break;
case Watcher::TIMER_ONCE:
// fallthrough
case Watcher::TIMER_REPEAT:
uv_timer_stop($watcher->uvHandle);
2014-08-06 00:17:03 -04:00
break;
default:
assert(false, "Unexpected Watcher type encountered");
2014-06-11 12:33:15 -04:00
}
2014-08-06 00:17:03 -04:00
$watcher->isEnabled = false;
$this->enabledWatcherCount--;
2014-06-11 12:33:15 -04:00
}
private function disablePollFromWatcher($watcher) {
$poll = $watcher->poll;
$watcherId = $watcher->id;
2014-11-12 13:17:08 -05:00
unset(
$poll->readers[$watcherId],
$poll->writers[$watcherId]
);
$poll->disable[$watcherId] = $watcher;
2014-11-12 13:17:08 -05:00
if (!($poll->readers || $poll->writers)) {
uv_poll_stop($poll->handle);
2014-11-12 13:17:08 -05:00
return;
}
// If we're still here we may need to update the polling flags
$newFlags = 0;
if ($poll->readers) {
$newFlags |= \UV::READABLE;
}
if ($poll->writers) {
$newFlags |= \UV::WRITABLE;
}
if ($poll->flags != $newFlags) {
$poll->flags = $newFlags;
uv_poll_start($poll->handle, $newFlags, $poll->callback);
}
}
2014-06-11 12:33:15 -04:00
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
2014-06-11 12:33:15 -04:00
*/
2015-03-19 11:14:21 -04:00
public function enable(string $watcherId) {
2014-06-11 12:33:15 -04:00
if (!isset($this->watchers[$watcherId])) {
return;
}
$watcher = $this->watchers[$watcherId];
if ($watcher->isEnabled) {
return;
}
switch ($watcher->type) {
case Watcher::TIMER_ONCE: // fallthrough
case Watcher::TIMER_REPEAT:
uv_timer_start($watcher->uvHandle, $watcher->msDelay, $watcher->msInterval, $watcher->callback);
break;
case Watcher::IO_READER: // fallthrough
case Watcher::IO_WRITER:
$this->enablePollFromWatcher($watcher);
2014-08-06 00:17:03 -04:00
break;
case Watcher::SIGNAL:
uv_signal_start($watcher->uvHandle, $watcher->callback, $watcher->signo);
2014-08-06 00:17:03 -04:00
break;
case Watcher::IMMEDIATE:
$this->immediates[$watcherId] = $watcher;
break;
2014-08-06 00:17:03 -04:00
default:
assert(false, "Unexpected Watcher type encountered");
2014-06-11 12:33:15 -04:00
}
2014-08-06 00:17:03 -04:00
$watcher->isEnabled = true;
$this->enabledWatcherCount++;
2014-06-11 12:33:15 -04:00
}
private function enablePollFromWatcher($watcher) {
$poll = $watcher->poll;
$watcherId = $watcher->id;
unset($poll->disable[$watcherId]);
if ($watcher->type === Watcher::IO_READER) {
$poll->flags |= \UV::READABLE;
$poll->readers[$watcherId] = $watcher;
} else {
$poll->flags |= \UV::WRITABLE;
$poll->writers[$watcherId] = $watcher;
}
@uv_poll_start($poll->handle, $poll->flags, $poll->callback);
}
/**
* Access the underlying php-uv extension loop resource
*
* This method exists outside the base Reactor API. It provides access to the underlying php-uv
* event loop resource for code that wishes to interact with lower-level php-uv extension
* functionality.
*
* @return resource
*/
public function getUnderlyingLoop() {
return $this->loop;
}
/**
2015-03-19 11:14:21 -04:00
* {@inheritDoc}
*/
2015-03-19 11:14:21 -04:00
public function onError(callable $func) {
$this->onError = $func;
}
public function __destruct() {
self::$instanceCount--;
}
public function __debugInfo() {
$immediates = $timers = $readers = $writers = $signals = $disabled = 0;
foreach ($this->watchers as $watcher) {
switch ($watcher->type) {
case Watcher::IMMEDIATE:
$immediates++;
break;
case Watcher::TIMER_ONCE:
case Watcher::TIMER_REPEAT:
$timers++;
break;
case Watcher::IO_READER:
$readers++;
break;
case Watcher::IO_WRITER:
$writers++;
break;
case Watcher::SIGNAL:
$signals++;
break;
default:
throw new \DomainException(
"Unexpected watcher type: {$watcher->type}"
);
}
$disabled += !$watcher->isEnabled;
}
return [
'timers' => $timers,
'immediates' => $immediates,
'io_readers' => $readers,
'io_writers' => $writers,
'signals' => $signals,
'disabled' => $disabled,
'last_watcher_id' => $this->lastWatcherId,
'instances' => self::$instanceCount,
];
}
}