mirror of
https://github.com/danog/parallel.git
synced 2025-01-22 14:01:14 +01:00
Use array instead of SplQueue
This commit is contained in:
parent
eb91972489
commit
ed08c738d5
@ -13,7 +13,12 @@ use Icicle\Coroutine;
|
||||
*/
|
||||
class ThreadedSemaphore extends \Threaded implements SemaphoreInterface
|
||||
{
|
||||
const LATENCY_TIMEOUT = 0.0001; // 100 μs
|
||||
const LATENCY_TIMEOUT = 0.01; // 10 ms
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private $id = 0;
|
||||
|
||||
/**
|
||||
* @var int The number of available locks.
|
||||
@ -21,9 +26,9 @@ class ThreadedSemaphore extends \Threaded implements SemaphoreInterface
|
||||
private $locks = 0;
|
||||
|
||||
/**
|
||||
* @var \SplQueue A queue of lock requests.
|
||||
* @var array A queue of lock requests.
|
||||
*/
|
||||
private $waitQueue;
|
||||
private $waitQueue = [];
|
||||
|
||||
/**
|
||||
* Creates a new semaphore.
|
||||
@ -33,7 +38,6 @@ class ThreadedSemaphore extends \Threaded implements SemaphoreInterface
|
||||
public function __construct($maxLocks)
|
||||
{
|
||||
$this->locks = $maxLocks;
|
||||
$this->waitQueue = new \SplQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -71,15 +75,13 @@ class ThreadedSemaphore extends \Threaded implements SemaphoreInterface
|
||||
try {
|
||||
// If there are no locks available or the wait queue is not empty,
|
||||
// we need to wait our turn to acquire a lock.
|
||||
if ($this->locks <= 0 || !$this->waitQueue->isEmpty()) {
|
||||
if ($this->locks <= 0 || !empty($this->waitQueue)) {
|
||||
// Since there are no free locks that we can claim yet, we need
|
||||
// to add our request to the queue of other threads waiting for
|
||||
// a free lock to make sure we don't steal one from another
|
||||
// thread that has been waiting longer than us.
|
||||
$waitId = mt_rand();
|
||||
$waitQueue = $this->waitQueue;
|
||||
$waitQueue->enqueue($waitId);
|
||||
$this->waitQueue = $waitQueue;
|
||||
$waitId = ++$this->id;
|
||||
$this->waitQueue = array_merge($this->waitQueue, [$waitId]);
|
||||
|
||||
// Sleep for a while, unlocking the first lock so that other
|
||||
// threads have a chance to release their locks. After we finish
|
||||
@ -89,12 +91,10 @@ class ThreadedSemaphore extends \Threaded implements SemaphoreInterface
|
||||
$this->unlock();
|
||||
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
|
||||
$this->lock();
|
||||
} while ($this->locks <= 0 || $this->waitQueue->bottom() !== $waitId);
|
||||
} while ($this->locks <= 0 || $this->waitQueue[0] !== $waitId);
|
||||
|
||||
// We have reached our turn, so remove ourselves from the queue.
|
||||
$waitQueue = $this->waitQueue;
|
||||
$waitQueue->dequeue();
|
||||
$this->waitQueue = $waitQueue;
|
||||
$this->waitQueue = array_slice($this->waitQueue, 1);
|
||||
}
|
||||
|
||||
// At this point, we have made sure that one of the locks in the
|
||||
|
Loading…
x
Reference in New Issue
Block a user