mirror of
https://github.com/danog/amp.git
synced 2024-11-26 20:15:00 +01:00
Use a custom priority queue for timers
Fixes #220 by allowing immediate removal of the watcher from the queue. Insert and extract is O(log(n)), peeking is O(1), and removal is O(n).
This commit is contained in:
parent
b6fc1e12d4
commit
c6f8425473
125
lib/Loop/Internal/TimerQueue.php
Normal file
125
lib/Loop/Internal/TimerQueue.php
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Amp\Loop\Internal;
|
||||||
|
|
||||||
|
use Amp\Loop\Watcher;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses a binary tree stored in an array to implement a heap.
|
||||||
|
*/
|
||||||
|
class TimerQueue
|
||||||
|
{
|
||||||
|
/** @var TimerQueueEntry[] */
|
||||||
|
private $data = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inserts the watcher into the queue. Time complexity: O(log(n)).
|
||||||
|
*
|
||||||
|
* @param Watcher $watcher
|
||||||
|
* @param int $expiration
|
||||||
|
*/
|
||||||
|
public function insert(Watcher $watcher, int $expiration)
|
||||||
|
{
|
||||||
|
$entry = new TimerQueueEntry;
|
||||||
|
$entry->watcher = $watcher;
|
||||||
|
$entry->expiration = $expiration;
|
||||||
|
|
||||||
|
$node = \count($this->data);
|
||||||
|
$this->data[$node] = $entry;
|
||||||
|
|
||||||
|
while ($node !== 0 && $entry->expiration < $this->data[$parent = ($node - 1) >> 1]->expiration) {
|
||||||
|
$this->data[$node] = $this->data[$parent];
|
||||||
|
$this->data[$parent] = $entry;
|
||||||
|
|
||||||
|
$node = $parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the given watcher from the queue. Time complexity: O(n).
|
||||||
|
*
|
||||||
|
* @param Watcher $watcher
|
||||||
|
*/
|
||||||
|
public function remove(Watcher $watcher)
|
||||||
|
{
|
||||||
|
foreach ($this->data as $node => $entry) {
|
||||||
|
if ($entry->watcher === $watcher) {
|
||||||
|
$this->removeAndRebuild($node);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes and returns the Watcher on top of the heap. Time complexity: O(log(n)).
|
||||||
|
*
|
||||||
|
* @return [Watcher, int] Tuple of the watcher and the expiration time.
|
||||||
|
*/
|
||||||
|
public function extract(): array
|
||||||
|
{
|
||||||
|
if ($this->isEmpty()) {
|
||||||
|
throw new \Error('No data left in the heap.');
|
||||||
|
}
|
||||||
|
|
||||||
|
$data = $this->removeAndRebuild(0);
|
||||||
|
|
||||||
|
return [$data->watcher, $data->expiration];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param int $node Remove the given node and then rebuild the data array from that node downward.
|
||||||
|
*
|
||||||
|
* @return TimerQueueEntry Removed entry.
|
||||||
|
*/
|
||||||
|
private function removeAndRebuild(int $node): TimerQueueEntry
|
||||||
|
{
|
||||||
|
$length = \count($this->data) - 1;
|
||||||
|
$data = $this->data[$node];
|
||||||
|
$this->data[$node] = $this->data[$length];
|
||||||
|
unset($this->data[$length]);
|
||||||
|
|
||||||
|
while (($child = ($node << 1) + 1) < $length) {
|
||||||
|
if ($this->data[$child]->expiration < $this->data[$node]->expiration
|
||||||
|
&& ($child + 1 >= $length || $this->data[$child]->expiration < $this->data[$child + 1]->expiration)
|
||||||
|
) {
|
||||||
|
// Left child is less than parent and right child.
|
||||||
|
$swap = $child;
|
||||||
|
} elseif ($child + 1 < $length && $this->data[$child + 1]->expiration < $this->data[$node]->expiration) {
|
||||||
|
// Right child is less than parent and left child.
|
||||||
|
$swap = $child + 1;
|
||||||
|
} else { // Left and right child are greater than parent.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
$temp = $this->data[$node];
|
||||||
|
$this->data[$node] = $this->data[$swap];
|
||||||
|
$this->data[$swap] = $temp;
|
||||||
|
$node = $swap;
|
||||||
|
}
|
||||||
|
|
||||||
|
return $data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value at the top of the heap (without removing it). Time complexity: O(1).
|
||||||
|
*
|
||||||
|
* @return [Watcher, int] Tuple of the watcher and the expiration time.
|
||||||
|
*/
|
||||||
|
public function peek(): array
|
||||||
|
{
|
||||||
|
if ($this->isEmpty()) {
|
||||||
|
throw new \Error('No data in the heap.');
|
||||||
|
}
|
||||||
|
|
||||||
|
return [$this->data[0]->watcher, $this->data[0]->expiration];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines if the heap is empty.
|
||||||
|
* @return bool
|
||||||
|
*/
|
||||||
|
public function isEmpty(): bool
|
||||||
|
{
|
||||||
|
return empty($this->data);
|
||||||
|
}
|
||||||
|
}
|
17
lib/Loop/Internal/TimerQueueEntry.php
Normal file
17
lib/Loop/Internal/TimerQueueEntry.php
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Amp\Loop\Internal;
|
||||||
|
|
||||||
|
use Amp\Loop\Watcher;
|
||||||
|
use Amp\Struct;
|
||||||
|
|
||||||
|
class TimerQueueEntry
|
||||||
|
{
|
||||||
|
use Struct;
|
||||||
|
|
||||||
|
/** @var Watcher */
|
||||||
|
public $watcher;
|
||||||
|
|
||||||
|
/** @var int */
|
||||||
|
public $expiration;
|
||||||
|
}
|
@ -22,10 +22,7 @@ class NativeDriver extends Driver
|
|||||||
/** @var \Amp\Loop\Watcher[][] */
|
/** @var \Amp\Loop\Watcher[][] */
|
||||||
private $writeWatchers = [];
|
private $writeWatchers = [];
|
||||||
|
|
||||||
/** @var int[] */
|
/** @var Internal\TimerQueue */
|
||||||
private $timerExpires = [];
|
|
||||||
|
|
||||||
/** @var \SplPriorityQueue */
|
|
||||||
private $timerQueue;
|
private $timerQueue;
|
||||||
|
|
||||||
/** @var \Amp\Loop\Watcher[][] */
|
/** @var \Amp\Loop\Watcher[][] */
|
||||||
@ -45,7 +42,7 @@ class NativeDriver extends Driver
|
|||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
$this->timerQueue = new \SplPriorityQueue();
|
$this->timerQueue = new Internal\TimerQueue;
|
||||||
$this->signalHandling = \extension_loaded("pcntl");
|
$this->signalHandling = \extension_loaded("pcntl");
|
||||||
$this->nowOffset = getCurrentTime();
|
$this->nowOffset = getCurrentTime();
|
||||||
$this->now = \random_int(0, $this->nowOffset);
|
$this->now = \random_int(0, $this->nowOffset);
|
||||||
@ -97,56 +94,48 @@ class NativeDriver extends Driver
|
|||||||
$blocking ? $this->getTimeout() : 0
|
$blocking ? $this->getTimeout() : 0
|
||||||
);
|
);
|
||||||
|
|
||||||
if (!empty($this->timerExpires)) {
|
$scheduleQueue = [];
|
||||||
$scheduleQueue = [];
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (!$this->timerQueue->isEmpty()) {
|
while (!$this->timerQueue->isEmpty()) {
|
||||||
list($watcher, $expiration) = $this->timerQueue->top();
|
list($watcher, $expiration) = $this->timerQueue->peek();
|
||||||
|
|
||||||
$id = $watcher->id;
|
if ($expiration > $this->now()) { // Timer at top of queue has not expired.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
|
$this->timerQueue->extract();
|
||||||
$this->timerQueue->extract(); // Timer was removed from queue.
|
|
||||||
|
if ($watcher->type & Watcher::REPEAT) {
|
||||||
|
$expiration = $this->now() + $watcher->value;
|
||||||
|
$scheduleQueue[] = [$watcher, $expiration];
|
||||||
|
} else {
|
||||||
|
$this->cancel($watcher->id);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Execute the timer.
|
||||||
|
$result = ($watcher->callback)($watcher->id, $watcher->data);
|
||||||
|
|
||||||
|
if ($result === null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->timerExpires[$id] > $this->now()) { // Timer at top of queue has not expired.
|
if ($result instanceof \Generator) {
|
||||||
break;
|
$result = new Coroutine($result);
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->timerQueue->extract();
|
if ($result instanceof Promise || $result instanceof ReactPromise) {
|
||||||
|
rethrow($result);
|
||||||
if ($watcher->type & Watcher::REPEAT) {
|
|
||||||
$expiration = $this->now() + $watcher->value;
|
|
||||||
$this->timerExpires[$watcher->id] = $expiration;
|
|
||||||
$scheduleQueue[] = [$watcher, $expiration];
|
|
||||||
} else {
|
|
||||||
$this->cancel($id);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Execute the timer.
|
|
||||||
$result = ($watcher->callback)($id, $watcher->data);
|
|
||||||
|
|
||||||
if ($result === null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($result instanceof \Generator) {
|
|
||||||
$result = new Coroutine($result);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($result instanceof Promise || $result instanceof ReactPromise) {
|
|
||||||
rethrow($result);
|
|
||||||
}
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
$this->error($exception);
|
|
||||||
}
|
}
|
||||||
|
} catch (\Throwable $exception) {
|
||||||
|
$this->error($exception);
|
||||||
}
|
}
|
||||||
} finally {
|
}
|
||||||
foreach ($scheduleQueue as $item) {
|
} finally {
|
||||||
$this->timerQueue->insert($item, -$item[1]);
|
foreach ($scheduleQueue as list($watcher, $expiration)) {
|
||||||
|
if ($watcher->enabled) {
|
||||||
|
$this->timerQueue->insert($watcher, $expiration);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -266,24 +255,15 @@ class NativeDriver extends Driver
|
|||||||
*/
|
*/
|
||||||
private function getTimeout(): int
|
private function getTimeout(): int
|
||||||
{
|
{
|
||||||
while (!$this->timerQueue->isEmpty()) {
|
if (!$this->timerQueue->isEmpty()) {
|
||||||
list($watcher, $expiration) = $this->timerQueue->top();
|
list($watcher, $expiration) = $this->timerQueue->peek();
|
||||||
|
|
||||||
$id = $watcher->id;
|
$expiration -= getCurrentTime() - $this->nowOffset;
|
||||||
|
|
||||||
if (!isset($this->timerExpires[$id]) || $expiration !== $this->timerExpires[$id]) {
|
|
||||||
$this->timerQueue->extract(); // Timer was removed from queue.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
$expiration -= $this->now();
|
|
||||||
|
|
||||||
if ($expiration < 0) {
|
if ($expiration < 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->nowUpdateNeeded = true; // Loop will block, so trigger now update after blocking.
|
|
||||||
|
|
||||||
return $expiration;
|
return $expiration;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -312,8 +292,7 @@ class NativeDriver extends Driver
|
|||||||
case Watcher::DELAY:
|
case Watcher::DELAY:
|
||||||
case Watcher::REPEAT:
|
case Watcher::REPEAT:
|
||||||
$expiration = $this->now() + $watcher->value;
|
$expiration = $this->now() + $watcher->value;
|
||||||
$this->timerExpires[$watcher->id] = $expiration;
|
$this->timerQueue->insert($watcher, $expiration);
|
||||||
$this->timerQueue->insert([$watcher, $expiration], -$expiration);
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Watcher::SIGNAL:
|
case Watcher::SIGNAL:
|
||||||
@ -362,7 +341,7 @@ class NativeDriver extends Driver
|
|||||||
|
|
||||||
case Watcher::DELAY:
|
case Watcher::DELAY:
|
||||||
case Watcher::REPEAT:
|
case Watcher::REPEAT:
|
||||||
unset($this->timerExpires[$watcher->id]);
|
$this->timerQueue->remove($watcher);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case Watcher::SIGNAL:
|
case Watcher::SIGNAL:
|
||||||
|
Loading…
Reference in New Issue
Block a user