locks = $locks; } /** * Gets the number of currently available locks. * * @return int The number of available locks. */ public function count(): int { return $this->locks; } /** * @return \AsyncInterop\Promise */ public function acquire(): Promise { return new Coroutine($this->doAcquire()); } /** * Uses a double locking mechanism to acquire a lock without blocking. A * synchronous mutex is used to make sure that the semaphore is queried one * at a time to preserve the integrity of the semaphore itself. Then a lock * count is used to check if a lock is available without blocking. * * If a lock is not available, we add the request to a queue and set a timer * to check again in the future. */ private function doAcquire(): \Generator { $tsl = function () { // If there are no locks available or the wait queue is not empty, // we need to wait our turn to acquire a lock. if ($this->locks > 0) { --$this->locks; return false; } return true; }; while ($this->locks < 1 || $this->synchronized($tsl)) { yield new Pause(self::LATENCY_TIMEOUT); } return new Lock(function () { $this->release(); }); } /** * Releases a lock from the semaphore. */ protected function release() { $this->synchronized(function () { ++$this->locks; }); } }