From d2ab1cd0a135495be3722c72429075259ac82989 Mon Sep 17 00:00:00 2001 From: coderstephen Date: Mon, 10 Aug 2015 22:27:10 -0500 Subject: [PATCH] First working async POSIX semaphore using shared objects --- src/Sync/PosixSemaphore.php | 178 ++++++++++++++++++++++++++++++ tests/Sync/PosixSemaphoreTest.php | 55 +++++++++ 2 files changed, 233 insertions(+) create mode 100644 src/Sync/PosixSemaphore.php create mode 100644 tests/Sync/PosixSemaphoreTest.php diff --git a/src/Sync/PosixSemaphore.php b/src/Sync/PosixSemaphore.php new file mode 100644 index 0000000..e4f4527 --- /dev/null +++ b/src/Sync/PosixSemaphore.php @@ -0,0 +1,178 @@ +key = abs(crc32(spl_object_hash($this))); + + $this->semaphore = sem_get($this->key, 1, 0600, 1); + if (!$this->semaphore) { + throw new SemaphoreException('Failed to create the semaphore.'); + } + + $this->data = new SharedObject([ + 'locks' => (int)$maxLocks, + 'waitQueue' => [], + ]); + } + + /** + * Acquires a lock from the semaphore. + * + * Blocks until a lock can be acquired. + */ + public function acquire() + { + $this->lock(); + + try { + $data = $this->data->deref(); + + // Attempt to acquire a lock from the semaphore. If no locks are + // available immediately, we have a lot of work to do... + if ($data['locks'] <= 0 || !empty($data['waitQueue'])) { + // Since there are no free locks that we can claim yet, we need + // to add our request to the queue of other threads waiting for + // a free lock to make sure we don't steal one from another + // thread that has been waiting longer than us. + $id = mt_rand(); + $data['waitQueue'][] = $id; + $this->data->set($data); + + // Sleep for a while, giving a chance for other threads to release + // their locks. After we finish sleeping, we can check again to see + // if it is our turn to acquire a lock. + do { + $this->unlock(); + yield Coroutine\sleep(self::LATENCY_TIMEOUT); + $this->lock(); + $data = $this->data->deref(); + } while ($data['locks'] <= 0 || $data['waitQueue'][0] !== $id); + } + + // At this point, we have made sure that one of the locks in the + // semaphore is available for us to use, so decrement the lock count + // to mark it as taken, and return a lock object that represents the + // lock just acquired. + $data = $this->data->deref(); + --$data['locks']; + $data['waitQueue'] = array_slice($data['waitQueue'], 1); + $this->data->set($data); + + yield new Lock(function (Lock $lock) { + $this->release(); + }); + } finally { + $this->unlock(); + } + } + + /** + * Removes the semaphore if it still exists. + */ + public function destroy() + { + if (!@sem_remove($this->semaphore)) { + $error = error_get_last(); + + if ($error['type'] !== E_WARNING) { + throw new SemaphoreException('Failed to remove the semaphore.'); + } + } + + $this->data->free(); + } + + /** + * Serializes the semaphore. + * + * @return string The serialized semaphore. + */ + public function serialize() + { + return serialize([$this->key, $this->data]); + } + + /** + * Unserializes a serialized semaphore. + * + * @param string $serialized The serialized semaphore. + */ + public function unserialize($serialized) + { + // Get the semaphore key and attempt to re-connect to the semaphore in + // memory. + list($this->key, $this->data) = unserialize($serialized); + $this->semaphore = sem_get($this->key, 1, 0600, 1); + } + + /** + * Releases a lock from the semaphore. + */ + protected function release() + { + $this->lock(); + + $data = $this->data->deref(); + ++$data['locks']; + $this->data->set($data); + + $this->unlock(); + } + + /** + * Locks the gatekeeper semaphore. + */ + private function lock() + { + if (!sem_acquire($this->semaphore)) { + throw new SemaphoreException('Failed to lock the semaphore.'); + } + } + + /** + * Unlocks the gatekeeper semaphore. + */ + private function unlock() + { + if (!sem_release($this->semaphore)) { + throw new SemaphoreException('Failed to unlock the semaphore.'); + } + } +} diff --git a/tests/Sync/PosixSemaphoreTest.php b/tests/Sync/PosixSemaphoreTest.php new file mode 100644 index 0000000..5eb0a1f --- /dev/null +++ b/tests/Sync/PosixSemaphoreTest.php @@ -0,0 +1,55 @@ +acquire()); + $lock->release(); + $this->assertTrue($lock->isReleased()); + }); + + Loop\run(); + } + + public function testAcquireMultiple() + { + ob_end_flush(); + + $this->assertRunTimeBetween(function () { + Coroutine\create(function () { + $semaphore = new PosixSemaphore(1); + + $lock1 = (yield $semaphore->acquire()); + Loop\timer(0.5, function () use ($lock1) { + $lock1->release(); + }); + + $lock2 = (yield $semaphore->acquire()); + Loop\timer(0.5, function () use ($lock2) { + $lock2->release(); + }); + + $lock3 = (yield $semaphore->acquire()); + Loop\timer(0.5, function () use ($lock3) { + $lock3->release(); + }); + }); + + Loop\run(); + }, 1.5, 1.65); + + ob_start(); + } +}