1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Refactor ThreadedSemaphore and ThreadedMutex

This commit is contained in:
Aaron Piotrowski 2015-08-09 22:30:11 -05:00
parent b0ebadedf6
commit 6f657e889e
7 changed files with 183 additions and 133 deletions

View File

@ -0,0 +1,45 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Coroutine;
/**
* @internal
*/
class InternalThreadedMutex extends \Threaded
{
const LATENCY_TIMEOUT = 0.01; // 10 ms
/**
* @var bool
*/
private $lock = true;
/**
* Attempts to acquire the lock and sleeps for a time if the lock could not be acquired.
*
* @return \Generator
*/
public function acquire()
{
$tsl = function () {
return ($this->lock ? $this->lock = false : true);
};
while ($this->synchronized($tsl)) {
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
}
yield new Lock(function () {
$this->release();
});
}
/**
* Releases the lock.
*/
protected function release()
{
$this->lock = true;
}
}

View File

@ -0,0 +1,116 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Coroutine;
/**
* An asynchronous semaphore based on pthreads' synchronization methods.
*
* @internal
*/
class InternalThreadedSemaphore extends \Threaded
{
const LATENCY_TIMEOUT = 0.01; // 10 ms
/**
* @var int
*/
private $nextId = 0;
/**
* @var int The number of available locks.
*/
private $locks;
/**
* @var array A queue of lock requests.
*/
private $waitQueue = [];
/**
* Creates a new semaphore.
*
* @param int $maxLocks The maximum number of processes that can lock the semaphore.
*/
public function __construct($maxLocks)
{
$this->locks = (int) $maxLocks;
if ($this->locks < 1) {
$this->locks = 1;
}
}
/**
* Gets the number of currently available locks.
*
* @return int The number of available locks.
*/
public function count()
{
return $this->locks;
}
/**
* Uses a double locking mechanism to acquire a lock without blocking. A
* synchronous mutex is used to make sure that the semaphore is queried one
* at a time to preserve the integrity of the semaphore itself. Then a lock
* count is used to check if a lock is available without blocking.
*
* If a lock is not available, we add the request to a queue and set a timer
* to check again in the future.
*/
public function acquire()
{
$tsl = function () use (&$waitId) {
// If there are no locks available or the wait queue is not empty,
// we need to wait our turn to acquire a lock.
if ($this->locks > 0 && empty($this->waitQueue)) {
--$this->locks;
return false;
}
// Since there are no free locks that we can claim yet, we need
// to add our request to the queue of other threads waiting for
// a free lock to make sure we don't steal one from another
// thread that has been waiting longer than us.
$waitId = $this->nextId++;
$this->waitQueue = array_merge($this->waitQueue, [$waitId]);
return true;
};
if ($this->synchronized($tsl)) {
$tsl = function () use (&$waitId) {
if ($this->locks > 0 && $this->waitQueue[0] === $waitId) {
// At this point, we have made sure that one of the locks in the
// semaphore is available for us to use, so decrement the lock count
// to mark it as taken, and return a lock object that represents the
// lock just acquired.
--$this->locks;
$this->waitQueue = array_slice($this->waitQueue, 1);
return false;
}
return true;
};
do {
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
} while ($this->synchronized($tsl));
}
yield new Lock(function () {
$this->release();
});
}
/**
* Releases a lock from the semaphore.
*/
protected function release()
{
$this->synchronized(function () {
++$this->locks;
});
}
}

View File

@ -50,7 +50,7 @@ class Lock
public function release()
{
if ($this->released) {
throw new LockAlreadyReleasedError();
throw new LockAlreadyReleasedError('The lock has already been released!');
}
// Invoke the releaser function given to us by the synchronization source

View File

@ -12,9 +12,13 @@ namespace Icicle\Concurrent\Sync;
interface MutexInterface
{
/**
* @coroutine
*
* Acquires a lock on the mutex.
*
* @return \Generator Resolves with a lock object when the acquire is successful.
*
* @resolve \Icicle\Concurrent\Sync\Lock
*/
public function acquire();
}

View File

@ -10,9 +10,13 @@ namespace Icicle\Concurrent\Sync;
interface SemaphoreInterface
{
/**
* @coroutine
*
* Acquires a lock from the semaphore asynchronously.
*
* @return \Icicle\Promise\PromiseInterface<Lock> A promise resolved with a lock.
* @return \Generator
*
* @resolve \Icicle\Concurrent\Sync\Lock
*/
public function acquire();
}

View File

@ -1,10 +1,6 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Concurrent\Exception\MutexException;
use Icicle\Coroutine;
use Mutex;
/**
* A thread-safe, asynchronous mutex using the pthreads locking mechanism.
*
@ -12,12 +8,10 @@ use Mutex;
*/
class ThreadedMutex implements MutexInterface
{
const LATENCY_TIMEOUT = 0.01; // 10 ms
/**
* @var long A unique handle ID on a system mutex.
* @var \Icicle\Concurrent\Sync\InternalThreadedMutex
*/
private $handle;
private $mutex;
/**
* Creates a new threaded mutex.
@ -26,7 +20,7 @@ class ThreadedMutex implements MutexInterface
*/
public function __construct($locked = false)
{
$this->handle = Mutex::create($locked);
$this->mutex = new InternalThreadedMutex();
}
/**
@ -34,39 +28,6 @@ class ThreadedMutex implements MutexInterface
*/
public function acquire()
{
// Try to access the lock. If we can't get the lock, set an asynchronous
// timer and try again.
while (!Mutex::trylock($this->handle)) {
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
}
// Return a lock object that can be used to release the lock on the mutex.
yield new Lock(function (Lock $lock) {
$this->release();
});
}
/**
* Destroys the mutex.
*
* @throws MutexException Thrown if the operation fails.
*/
public function destroy()
{
if (!Mutex::destroy($this->handle)) {
throw new MutexException('Failed to destroy the mutex. Did you forget to unlock it first?');
}
}
/**
* Releases the lock from the mutex.
*
* @throws MutexException Thrown if the operation fails.
*/
protected function release()
{
if (!Mutex::unlock($this->handle)) {
throw new MutexException('Failed to unlock the mutex.');
}
return $this->mutex->acquire();
}
}

View File

@ -1,8 +1,6 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Coroutine;
/**
* An asynchronous semaphore based on pthreads' synchronization methods.
*
@ -11,114 +9,36 @@ use Icicle\Coroutine;
* may not acquire a lock immediately when one is available and there may be a
* small delay. However, the small delay will not block the thread.
*/
class ThreadedSemaphore extends \Threaded implements SemaphoreInterface
class ThreadedSemaphore implements SemaphoreInterface
{
const LATENCY_TIMEOUT = 0.01; // 10 ms
/**
* @var int
* @var \Icicle\Concurrent\Sync\InternalThreadedSemaphore
*/
private $id = 0;
private $semaphore;
/**
* @var int The number of available locks.
*/
private $locks = 0;
/**
* @var array A queue of lock requests.
*/
private $waitQueue = [];
/**
* Creates a new semaphore.
*
* @param int $maxLocks The maximum number of processes that can lock the semaphore.
* @param int $maxLocks
*/
public function __construct($maxLocks)
{
$this->locks = $maxLocks;
$this->semaphore = new InternalThreadedSemaphore($maxLocks);
}
/**
* Gets the number of currently available locks.
*
* Note that this operation will block the current thread if another thread
* is acquiring or releasing a lock.
*
* @return int The number of available locks.
*/
public function count()
{
return $this->synchronized(function () {
return $this->locks;
});
return $this->semaphore->count();
}
/**
* {@inheritdoc}
*
* Uses a double locking mechanism to acquire a lock without blocking. A
* synchronous mutex is used to make sure that the semaphore is queried one
* at a time to preserve the integrity of the smaphore itself. Then a lock
* count is used to check if a lock is available without blocking.
*
* If a lock is not available, we add the request to a queue and set a timer
* to check again in the future.
*/
public function acquire()
{
// First, lock a mutex synchronously to prevent corrupting our semaphore
// data structure.
$this->lock();
try {
// If there are no locks available or the wait queue is not empty,
// we need to wait our turn to acquire a lock.
if ($this->locks <= 0 || !empty($this->waitQueue)) {
// Since there are no free locks that we can claim yet, we need
// to add our request to the queue of other threads waiting for
// a free lock to make sure we don't steal one from another
// thread that has been waiting longer than us.
$waitId = ++$this->id;
$this->waitQueue = array_merge($this->waitQueue, [$waitId]);
// Sleep for a while, unlocking the first lock so that other
// threads have a chance to release their locks. After we finish
// sleeping, we can check again to see if it is our turn to acquire
// a lock.
do {
$this->unlock();
yield Coroutine\sleep(self::LATENCY_TIMEOUT);
$this->lock();
} while ($this->locks <= 0 || $this->waitQueue[0] !== $waitId);
// We have reached our turn, so remove ourselves from the queue.
$this->waitQueue = array_slice($this->waitQueue, 1);
}
// At this point, we have made sure that one of the locks in the
// semaphore is available for us to use, so decrement the lock count
// to mark it as taken, and return a lock object that represents the
// lock just acquired.
--$this->locks;
yield new Lock(function (Lock $lock) {
$this->release();
});
} finally {
// Even if an exception is thrown, we want to unlock our synchronous
// mutex so we don't bloack any other threads.
$this->unlock();
}
return $this->semaphore->acquire();
}
/**
* Releases a lock from the semaphore.
*/
protected function release()
{
$this->synchronized(function () {
++$this->locks;
});
}
}
}