1
0
mirror of https://github.com/danog/amp.git synced 2024-12-13 18:07:30 +01:00
amp/lib/NativeReactor.php

438 lines
14 KiB
PHP
Raw Normal View History

2013-08-05 22:05:08 +02:00
<?php
2014-09-23 04:38:32 +02:00
namespace Amp;
2013-08-05 22:05:08 +02:00
2015-03-16 20:00:10 +01:00
class NativeReactor implements Reactor {
2013-08-05 22:05:08 +02:00
private $alarms = [];
private $immediates = [];
2013-08-05 22:05:08 +02:00
private $alarmOrder = [];
private $readStreams = [];
private $writeStreams = [];
private $readCallbacks = [];
private $writeCallbacks = [];
private $watcherIdReadStreamIdMap = [];
private $watcherIdWriteStreamIdMap = [];
private $disabledWatchers = [];
private $resolution = 1000;
2015-03-19 16:14:21 +01:00
private $lastWatcherId = "a";
2014-02-23 22:26:28 +01:00
private $isRunning = false;
2014-12-08 15:50:09 +01:00
private $isTicking = false;
private $onError;
private $onCallbackResolution;
2013-08-05 22:05:08 +02:00
private static $instanceCount = 0;
2013-08-05 22:05:08 +02:00
private static $DISABLED_ALARM = 0;
private static $DISABLED_READ = 1;
private static $DISABLED_WRITE = 2;
private static $DISABLED_IMMEDIATE = 3;
private static $MICROSECOND = 1000000;
2013-08-05 22:05:08 +02:00
2014-09-22 22:47:48 +02:00
public function __construct() {
self::$instanceCount++;
$this->onCallbackResolution = function($e = null, $r = null) {
if (empty($e)) {
return;
} elseif ($onError = $this->onError) {
$onError($e);
} else {
2014-09-22 22:47:48 +02:00
throw $e;
}
};
}
public function __debugInfo() {
return [
'timers' => count($this->alarms),
'immediates' => count($this->immediates),
'io_readers' => count($this->readStreams),
'io_writers' => count($this->writeStreams),
'disabled' => count($this->disabledWatchers),
'last_watcher_id' => $this->lastWatcherId,
'instances' => self::$instanceCount,
];
}
public function __destruct() {
self::$instanceCount--;
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2014-08-07 07:35:05 +02:00
public function run(callable $onStart = null) {
if ($this->isRunning) {
return;
}
2014-02-23 22:26:28 +01:00
$this->isRunning = true;
if ($onStart) {
$this->immediately($onStart);
}
$this->enableAlarms();
while ($this->isRunning || $this->immediates) {
$this->tick();
2013-08-05 22:05:08 +02:00
}
}
private function enableAlarms() {
2014-02-23 22:26:28 +01:00
$now = microtime(true);
2013-08-05 22:05:08 +02:00
foreach ($this->alarms as $watcherId => $alarmStruct) {
$nextExecution = $alarmStruct[1];
if (!$nextExecution) {
$delay = $alarmStruct[2];
$nextExecution = $now + $delay;
$alarmStruct[1] = $nextExecution;
$this->alarms[$watcherId] = $alarmStruct;
$this->alarmOrder[$watcherId] = $nextExecution;
}
}
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2014-02-23 22:26:28 +01:00
public function stop() {
2014-12-08 17:56:26 +01:00
$this->isRunning = $this->isTicking = false;
2013-08-05 22:05:08 +02:00
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function tick(bool $noWait = false) {
try {
2014-12-08 15:50:09 +01:00
$this->isTicking = true;
if (!$this->isRunning) {
$this->enableAlarms();
}
2013-08-05 22:05:08 +02:00
if ($immediates = $this->immediates) {
$this->immediates = [];
foreach ($immediates as $watcherId => $callback) {
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
2015-03-16 20:00:10 +01:00
resolve($result, $this)->when($this->onCallbackResolution);
}
2014-09-22 22:47:48 +02:00
}
}
// If an immediately watcher called stop() then pull out here
2014-12-08 15:50:09 +01:00
if (!$this->isTicking) {
return;
}
if ($this->immediates) {
$timeToNextAlarm = 0;
} elseif ($this->alarmOrder) {
$timeToNextAlarm = $noWait ? 0 : round(min($this->alarmOrder) - microtime(true), 4);
} else {
$timeToNextAlarm = $noWait ? 0 : 1;
}
2013-08-05 22:05:08 +02:00
2014-12-08 15:50:09 +01:00
if ($this->readStreams || $this->writeStreams) {
$this->selectActionableStreams($timeToNextAlarm);
} elseif (!($this->alarmOrder || $this->immediates)) {
$this->stop();
} elseif ($timeToNextAlarm > 0) {
usleep($timeToNextAlarm * self::$MICROSECOND);
}
2013-08-05 22:05:08 +02:00
if ($this->alarmOrder) {
$this->executeAlarms();
}
2014-12-08 15:50:09 +01:00
$this->isTicking = false;
} catch (\Exception $error) {
$errorHandler = $this->onCallbackResolution;
$errorHandler($error);
2013-08-05 22:05:08 +02:00
}
}
private function selectActionableStreams($timeout) {
$r = $this->readStreams;
$w = $this->writeStreams;
2014-08-07 07:35:05 +02:00
$e = null;
2013-08-05 22:05:08 +02:00
if ($timeout <= 0) {
$sec = 0;
$usec = 0;
} else {
$sec = floor($timeout);
$usec = ($timeout - $sec) * self::$MICROSECOND;
}
2014-04-14 23:03:38 +02:00
if (@stream_select($r, $w, $e, $sec, $usec)) {
2013-08-05 22:05:08 +02:00
foreach ($r as $readableStream) {
$streamId = (int) $readableStream;
foreach ($this->readCallbacks[$streamId] as $watcherId => $callback) {
2014-09-22 22:47:48 +02:00
$result = $callback($this, $watcherId, $readableStream);
if ($result instanceof \Generator) {
2015-03-16 20:00:10 +01:00
resolve($result, $this)->when($this->onCallbackResolution);
2014-09-22 22:47:48 +02:00
}
2013-08-05 22:05:08 +02:00
}
}
foreach ($w as $writableStream) {
$streamId = (int) $writableStream;
foreach ($this->writeCallbacks[$streamId] as $watcherId => $callback) {
2014-09-22 22:47:48 +02:00
$result = $callback($this, $watcherId, $writableStream);
if ($result instanceof \Generator) {
2015-03-16 20:00:10 +01:00
resolve($result, $this)->when($this->onCallbackResolution);
2014-09-22 22:47:48 +02:00
}
2013-08-05 22:05:08 +02:00
}
}
}
}
private function executeAlarms() {
2014-02-23 22:26:28 +01:00
$now = microtime(true);
2013-08-05 22:05:08 +02:00
asort($this->alarmOrder);
foreach ($this->alarmOrder as $watcherId => $executionCutoff) {
2014-09-22 22:47:48 +02:00
if ($executionCutoff > $now) {
2013-08-05 22:05:08 +02:00
break;
}
2014-09-22 22:47:48 +02:00
list($callback, $nextExecution, $interval, $isRepeating) = $this->alarms[$watcherId];
2013-08-05 22:05:08 +02:00
2014-09-22 22:47:48 +02:00
if ($isRepeating) {
$nextExecution += $interval;
$this->alarms[$watcherId] = [$callback, $nextExecution, $interval, $isRepeating];
$this->alarmOrder[$watcherId] = $nextExecution;
} else {
unset(
$this->alarms[$watcherId],
$this->alarmOrder[$watcherId]
);
}
2013-08-12 17:35:18 +02:00
2014-09-22 22:47:48 +02:00
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
2015-03-16 20:00:10 +01:00
resolve($result, $this)->when($this->onCallbackResolution);
2014-09-22 22:47:48 +02:00
}
}
2013-08-05 22:05:08 +02:00
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function immediately(callable $callback): string {
$watcherId = $this->lastWatcherId++;
$this->immediates[$watcherId] = $callback;
return $watcherId;
2013-08-05 22:05:08 +02:00
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function once(callable $callback, int $msDelay): string {
2014-08-08 18:26:08 +02:00
return $this->scheduleAlarm($callback, $msDelay, $isRepeating = false);
2013-08-05 22:05:08 +02:00
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function repeat(callable $callback, int $msDelay): string {
2014-08-08 18:26:08 +02:00
return $this->scheduleAlarm($callback, $msDelay, $isRepeating = true);
2013-08-05 22:05:08 +02:00
}
2015-03-19 16:14:21 +01:00
private function scheduleAlarm(callable $callback, int $msDelay, bool $isRepeating): string {
$watcherId = $this->lastWatcherId++;
2014-08-08 18:26:08 +02:00
$msDelay = round(($msDelay / $this->resolution), 3);
2013-08-05 22:05:08 +02:00
if ($this->isRunning) {
2014-08-08 18:26:08 +02:00
$nextExecution = (microtime(true) + $msDelay);
2013-08-05 22:05:08 +02:00
$this->alarmOrder[$watcherId] = $nextExecution;
} else {
2014-08-07 07:35:05 +02:00
$nextExecution = null;
2013-08-05 22:05:08 +02:00
}
2014-08-08 18:26:08 +02:00
$alarmStruct = [$callback, $nextExecution, $msDelay, $isRepeating];
2013-08-05 22:05:08 +02:00
$this->alarms[$watcherId] = $alarmStruct;
return $watcherId;
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function onReadable($stream, callable $callback, bool $enableNow = true): string {
2014-08-21 19:54:02 +02:00
$watcherId = (string) $this->lastWatcherId++;
2013-08-05 22:05:08 +02:00
if ($enableNow) {
$streamId = (int) $stream;
$this->readStreams[$streamId] = $stream;
$this->readCallbacks[$streamId][$watcherId] = $callback;
$this->watcherIdReadStreamIdMap[$watcherId] = $streamId;
} else {
$this->disabledWatchers[$watcherId] = [self::$DISABLED_READ, [$stream, $callback]];
}
return $watcherId;
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function onWritable($stream, callable $callback, bool $enableNow = true): string {
2014-08-21 19:54:02 +02:00
$watcherId = (string) $this->lastWatcherId++;
2013-08-05 22:05:08 +02:00
if ($enableNow) {
$streamId = (int) $stream;
$this->writeStreams[$streamId] = $stream;
$this->writeCallbacks[$streamId][$watcherId] = $callback;
$this->watcherIdWriteStreamIdMap[$watcherId] = $streamId;
} else {
$this->disabledWatchers[$watcherId] = [self::$DISABLED_WRITE, [$stream, $callback]];
}
return $watcherId;
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function cancel(string $watcherId) {
2013-08-05 22:05:08 +02:00
if (isset($this->alarms[$watcherId])) {
unset(
$this->alarms[$watcherId],
$this->alarmOrder[$watcherId]
);
} elseif (isset($this->watcherIdReadStreamIdMap[$watcherId])) {
$this->cancelReadWatcher($watcherId);
} elseif (isset($this->watcherIdWriteStreamIdMap[$watcherId])) {
$this->cancelWriteWatcher($watcherId);
} elseif (isset($this->disabledWatchers[$watcherId])) {
unset($this->disabledWatchers[$watcherId]);
} elseif (isset($this->immediates[$watcherId])) {
unset($this->immediates[$watcherId]);
2013-08-05 22:05:08 +02:00
}
}
2015-03-19 16:14:21 +01:00
private function cancelReadWatcher(string $watcherId) {
2013-08-05 22:05:08 +02:00
$streamId = $this->watcherIdReadStreamIdMap[$watcherId];
unset(
$this->readCallbacks[$streamId][$watcherId],
$this->watcherIdReadStreamIdMap[$watcherId],
$this->disabledWatchers[$watcherId]
);
if (empty($this->readCallbacks[$streamId])) {
unset($this->readStreams[$streamId]);
}
}
2015-03-19 16:14:21 +01:00
private function cancelWriteWatcher(string $watcherId) {
2013-08-05 22:05:08 +02:00
$streamId = $this->watcherIdWriteStreamIdMap[$watcherId];
unset(
$this->writeCallbacks[$streamId][$watcherId],
$this->watcherIdWriteStreamIdMap[$watcherId],
$this->disabledWatchers[$watcherId]
);
if (empty($this->writeCallbacks[$streamId])) {
unset($this->writeStreams[$streamId]);
}
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function enable(string $watcherId) {
2013-08-05 22:05:08 +02:00
if (!isset($this->disabledWatchers[$watcherId])) {
return;
}
list($type, $watcherStruct) = $this->disabledWatchers[$watcherId];
unset($this->disabledWatchers[$watcherId]);
switch ($type) {
case self::$DISABLED_ALARM:
if (!$nextExecution = $watcherStruct[1]) {
2014-02-23 22:26:28 +01:00
$nextExecution = microtime(true) + $watcherStruct[2];
2013-08-05 22:05:08 +02:00
$watcherStruct[1] = $nextExecution;
}
$this->alarms[$watcherId] = $watcherStruct;
$this->alarmOrder[$watcherId] = $nextExecution;
break;
case self::$DISABLED_READ:
list($stream, $callback) = $watcherStruct;
$streamId = (int) $stream;
$this->readCallbacks[$streamId][$watcherId] = $callback;
$this->watcherIdReadStreamIdMap[$watcherId] = $streamId;
$this->readStreams[$streamId] = $stream;
break;
case self::$DISABLED_WRITE:
list($stream, $callback) = $watcherStruct;
$streamId = (int) $stream;
$this->writeCallbacks[$streamId][$watcherId] = $callback;
$this->watcherIdWriteStreamIdMap[$watcherId] = $streamId;
$this->writeStreams[$streamId] = $stream;
break;
case self::$DISABLED_IMMEDIATE:
$this->immediates[$watcherId] = $watcherStruct;
break;
2013-08-05 22:05:08 +02:00
}
}
2014-08-08 18:26:08 +02:00
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
2014-08-08 18:26:08 +02:00
*/
2015-03-19 16:14:21 +01:00
public function disable(string $watcherId) {
2013-08-05 22:05:08 +02:00
if (isset($this->alarms[$watcherId])) {
$alarmStruct = $this->alarms[$watcherId];
$this->disabledWatchers[$watcherId] = [self::$DISABLED_ALARM, $alarmStruct];
unset(
$this->alarms[$watcherId],
$this->alarmOrder[$watcherId]
);
} elseif (isset($this->watcherIdReadStreamIdMap[$watcherId])) {
$streamId = $this->watcherIdReadStreamIdMap[$watcherId];
$stream = $this->readStreams[$streamId];
$callback = $this->readCallbacks[$streamId][$watcherId];
unset(
$this->readCallbacks[$streamId][$watcherId],
$this->watcherIdReadStreamIdMap[$watcherId]
);
if (empty($this->readCallbacks[$streamId])) {
unset($this->readStreams[$streamId]);
}
$this->disabledWatchers[$watcherId] = [self::$DISABLED_READ, [$stream, $callback]];
} elseif (isset($this->watcherIdWriteStreamIdMap[$watcherId])) {
$streamId = $this->watcherIdWriteStreamIdMap[$watcherId];
$stream = $this->writeStreams[$streamId];
$callback = $this->writeCallbacks[$streamId][$watcherId];
unset(
$this->writeCallbacks[$streamId][$watcherId],
$this->watcherIdWriteStreamIdMap[$watcherId]
);
if (empty($this->writeCallbacks[$streamId])) {
unset($this->writeStreams[$streamId]);
}
$this->disabledWatchers[$watcherId] = [self::$DISABLED_WRITE, [$stream, $callback]];
} elseif (isset($this->immediates[$watcherId])) {
$this->disabledWatchers[$watcherId] = [self::$DISABLED_IMMEDIATE, $this->immediates[$watcherId]];
unset($this->immediates[$watcherId]);
2013-08-05 22:05:08 +02:00
}
}
/**
2015-03-19 16:14:21 +01:00
* {@inheritDoc}
*/
2015-03-19 16:14:21 +01:00
public function onError(callable $func) {
$this->onError = $func;
}
2013-08-05 22:05:08 +02:00
}