From 6f657e889e6a93200322ab8521794fd9482f150a Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 9 Aug 2015 22:30:11 -0500 Subject: [PATCH] Refactor ThreadedSemaphore and ThreadedMutex --- src/Sync/InternalThreadedMutex.php | 45 ++++++++++ src/Sync/InternalThreadedSemaphore.php | 116 +++++++++++++++++++++++++ src/Sync/Lock.php | 2 +- src/Sync/MutexInterface.php | 4 + src/Sync/SemaphoreInterface.php | 6 +- src/Sync/ThreadedMutex.php | 47 +--------- src/Sync/ThreadedSemaphore.php | 96 ++------------------ 7 files changed, 183 insertions(+), 133 deletions(-) create mode 100644 src/Sync/InternalThreadedMutex.php create mode 100644 src/Sync/InternalThreadedSemaphore.php diff --git a/src/Sync/InternalThreadedMutex.php b/src/Sync/InternalThreadedMutex.php new file mode 100644 index 0000000..22ad26a --- /dev/null +++ b/src/Sync/InternalThreadedMutex.php @@ -0,0 +1,45 @@ +lock ? $this->lock = false : true); + }; + + while ($this->synchronized($tsl)) { + yield Coroutine\sleep(self::LATENCY_TIMEOUT); + } + + yield new Lock(function () { + $this->release(); + }); + } + + /** + * Releases the lock. + */ + protected function release() + { + $this->lock = true; + } +} \ No newline at end of file diff --git a/src/Sync/InternalThreadedSemaphore.php b/src/Sync/InternalThreadedSemaphore.php new file mode 100644 index 0000000..78f96b7 --- /dev/null +++ b/src/Sync/InternalThreadedSemaphore.php @@ -0,0 +1,116 @@ +locks = (int) $maxLocks; + if ($this->locks < 1) { + $this->locks = 1; + } + } + + /** + * Gets the number of currently available locks. + * + * @return int The number of available locks. + */ + public function count() + { + return $this->locks; + } + + /** + * Uses a double locking mechanism to acquire a lock without blocking. A + * synchronous mutex is used to make sure that the semaphore is queried one + * at a time to preserve the integrity of the semaphore itself. Then a lock + * count is used to check if a lock is available without blocking. + * + * If a lock is not available, we add the request to a queue and set a timer + * to check again in the future. + */ + public function acquire() + { + $tsl = function () use (&$waitId) { + // If there are no locks available or the wait queue is not empty, + // we need to wait our turn to acquire a lock. + if ($this->locks > 0 && empty($this->waitQueue)) { + --$this->locks; + return false; + } + + // Since there are no free locks that we can claim yet, we need + // to add our request to the queue of other threads waiting for + // a free lock to make sure we don't steal one from another + // thread that has been waiting longer than us. + $waitId = $this->nextId++; + $this->waitQueue = array_merge($this->waitQueue, [$waitId]); + + return true; + }; + + if ($this->synchronized($tsl)) { + $tsl = function () use (&$waitId) { + if ($this->locks > 0 && $this->waitQueue[0] === $waitId) { + // At this point, we have made sure that one of the locks in the + // semaphore is available for us to use, so decrement the lock count + // to mark it as taken, and return a lock object that represents the + // lock just acquired. + --$this->locks; + $this->waitQueue = array_slice($this->waitQueue, 1); + return false; + } + + return true; + }; + + do { + yield Coroutine\sleep(self::LATENCY_TIMEOUT); + } while ($this->synchronized($tsl)); + } + + yield new Lock(function () { + $this->release(); + }); + } + + /** + * Releases a lock from the semaphore. + */ + protected function release() + { + $this->synchronized(function () { + ++$this->locks; + }); + } +} diff --git a/src/Sync/Lock.php b/src/Sync/Lock.php index a63a80e..6fa6791 100644 --- a/src/Sync/Lock.php +++ b/src/Sync/Lock.php @@ -50,7 +50,7 @@ class Lock public function release() { if ($this->released) { - throw new LockAlreadyReleasedError(); + throw new LockAlreadyReleasedError('The lock has already been released!'); } // Invoke the releaser function given to us by the synchronization source diff --git a/src/Sync/MutexInterface.php b/src/Sync/MutexInterface.php index f66d9eb..21d2967 100644 --- a/src/Sync/MutexInterface.php +++ b/src/Sync/MutexInterface.php @@ -12,9 +12,13 @@ namespace Icicle\Concurrent\Sync; interface MutexInterface { /** + * @coroutine + * * Acquires a lock on the mutex. * * @return \Generator Resolves with a lock object when the acquire is successful. + * + * @resolve \Icicle\Concurrent\Sync\Lock */ public function acquire(); } diff --git a/src/Sync/SemaphoreInterface.php b/src/Sync/SemaphoreInterface.php index 80e8f36..66b0120 100644 --- a/src/Sync/SemaphoreInterface.php +++ b/src/Sync/SemaphoreInterface.php @@ -10,9 +10,13 @@ namespace Icicle\Concurrent\Sync; interface SemaphoreInterface { /** + * @coroutine + * * Acquires a lock from the semaphore asynchronously. * - * @return \Icicle\Promise\PromiseInterface A promise resolved with a lock. + * @return \Generator + * + * @resolve \Icicle\Concurrent\Sync\Lock */ public function acquire(); } diff --git a/src/Sync/ThreadedMutex.php b/src/Sync/ThreadedMutex.php index 06fee9e..5640155 100644 --- a/src/Sync/ThreadedMutex.php +++ b/src/Sync/ThreadedMutex.php @@ -1,10 +1,6 @@ handle = Mutex::create($locked); + $this->mutex = new InternalThreadedMutex(); } /** @@ -34,39 +28,6 @@ class ThreadedMutex implements MutexInterface */ public function acquire() { - // Try to access the lock. If we can't get the lock, set an asynchronous - // timer and try again. - while (!Mutex::trylock($this->handle)) { - yield Coroutine\sleep(self::LATENCY_TIMEOUT); - } - - // Return a lock object that can be used to release the lock on the mutex. - yield new Lock(function (Lock $lock) { - $this->release(); - }); - } - - /** - * Destroys the mutex. - * - * @throws MutexException Thrown if the operation fails. - */ - public function destroy() - { - if (!Mutex::destroy($this->handle)) { - throw new MutexException('Failed to destroy the mutex. Did you forget to unlock it first?'); - } - } - - /** - * Releases the lock from the mutex. - * - * @throws MutexException Thrown if the operation fails. - */ - protected function release() - { - if (!Mutex::unlock($this->handle)) { - throw new MutexException('Failed to unlock the mutex.'); - } + return $this->mutex->acquire(); } } diff --git a/src/Sync/ThreadedSemaphore.php b/src/Sync/ThreadedSemaphore.php index 42db961..c6573b5 100644 --- a/src/Sync/ThreadedSemaphore.php +++ b/src/Sync/ThreadedSemaphore.php @@ -1,8 +1,6 @@ locks = $maxLocks; + $this->semaphore = new InternalThreadedSemaphore($maxLocks); } /** * Gets the number of currently available locks. * - * Note that this operation will block the current thread if another thread - * is acquiring or releasing a lock. - * * @return int The number of available locks. */ public function count() { - return $this->synchronized(function () { - return $this->locks; - }); + return $this->semaphore->count(); } /** * {@inheritdoc} - * - * Uses a double locking mechanism to acquire a lock without blocking. A - * synchronous mutex is used to make sure that the semaphore is queried one - * at a time to preserve the integrity of the smaphore itself. Then a lock - * count is used to check if a lock is available without blocking. - * - * If a lock is not available, we add the request to a queue and set a timer - * to check again in the future. */ public function acquire() { - // First, lock a mutex synchronously to prevent corrupting our semaphore - // data structure. - $this->lock(); - - try { - // If there are no locks available or the wait queue is not empty, - // we need to wait our turn to acquire a lock. - if ($this->locks <= 0 || !empty($this->waitQueue)) { - // Since there are no free locks that we can claim yet, we need - // to add our request to the queue of other threads waiting for - // a free lock to make sure we don't steal one from another - // thread that has been waiting longer than us. - $waitId = ++$this->id; - $this->waitQueue = array_merge($this->waitQueue, [$waitId]); - - // Sleep for a while, unlocking the first lock so that other - // threads have a chance to release their locks. After we finish - // sleeping, we can check again to see if it is our turn to acquire - // a lock. - do { - $this->unlock(); - yield Coroutine\sleep(self::LATENCY_TIMEOUT); - $this->lock(); - } while ($this->locks <= 0 || $this->waitQueue[0] !== $waitId); - - // We have reached our turn, so remove ourselves from the queue. - $this->waitQueue = array_slice($this->waitQueue, 1); - } - - // At this point, we have made sure that one of the locks in the - // semaphore is available for us to use, so decrement the lock count - // to mark it as taken, and return a lock object that represents the - // lock just acquired. - --$this->locks; - yield new Lock(function (Lock $lock) { - $this->release(); - }); - } finally { - // Even if an exception is thrown, we want to unlock our synchronous - // mutex so we don't bloack any other threads. - $this->unlock(); - } + return $this->semaphore->acquire(); } - - /** - * Releases a lock from the semaphore. - */ - protected function release() - { - $this->synchronized(function () { - ++$this->locks; - }); - } -} +} \ No newline at end of file