1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Remove FIFO from threaded semaphore; improve threaded mutex

This commit is contained in:
Aaron Piotrowski 2015-09-01 16:19:59 -05:00
parent 2af5fff01b
commit 1c4244d0b9
2 changed files with 5 additions and 29 deletions

View File

@ -27,7 +27,7 @@ class Mutex extends \Threaded
return ($this->lock ? $this->lock = false : true);
};
while ($this->synchronized($tsl)) {
while ($this->lock && $this->synchronized($tsl)) {
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
}

View File

@ -62,42 +62,18 @@ class Semaphore extends \Threaded
*/
public function acquire()
{
$tsl = function () use (&$waitId) {
$tsl = function () {
// If there are no locks available or the wait queue is not empty,
// we need to wait our turn to acquire a lock.
if ($this->locks > 0 && empty($this->waitQueue)) {
if ($this->locks > 0) {
--$this->locks;
return false;
}
// Since there are no free locks that we can claim yet, we need
// to add our request to the queue of other threads waiting for
// a free lock to make sure we don't steal one from another
// thread that has been waiting longer than us.
$waitId = $this->nextId++;
$this->waitQueue = array_merge($this->waitQueue, [$waitId]);
return true;
};
if ($this->synchronized($tsl)) {
$tsl = function () use (&$waitId) {
if ($this->locks > 0 && $this->waitQueue[0] === $waitId) {
// At this point, we have made sure that one of the locks in the
// semaphore is available for us to use, so decrement the lock count
// to mark it as taken, and return a lock object that represents the
// lock just acquired.
--$this->locks;
$this->waitQueue = array_slice($this->waitQueue, 1);
return false;
}
return true;
};
do {
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
} while ($this->synchronized($tsl));
while ($this->locks > 0 && $this->synchronized($tsl)) {
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
}
yield new Lock(function () {