2015-08-11 05:27:10 +02:00
|
|
|
<?php
|
|
|
|
namespace Icicle\Concurrent\Sync;
|
|
|
|
|
|
|
|
use Icicle\Concurrent\Exception\SemaphoreException;
|
|
|
|
use Icicle\Coroutine;
|
|
|
|
|
|
|
|
/**
|
2015-08-31 11:18:14 +02:00
|
|
|
* A non-blocking, interprocess POSIX semaphore.
|
2015-08-11 05:27:10 +02:00
|
|
|
*
|
2015-08-31 11:18:14 +02:00
|
|
|
* Uses a POSIX message queue to store a queue of permits in a lock-free data structure. This semaphore implementation
|
|
|
|
* is preferred over other implementations when available, as it provides the best performance.
|
2015-08-11 05:27:10 +02:00
|
|
|
*
|
2015-08-31 11:18:14 +02:00
|
|
|
* Not compatible with Windows.
|
2015-08-11 05:27:10 +02:00
|
|
|
*/
|
|
|
|
class PosixSemaphore implements SemaphoreInterface, \Serializable
|
|
|
|
{
|
|
|
|
const LATENCY_TIMEOUT = 0.01; // 10 ms
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @var int The semaphore key.
|
|
|
|
*/
|
|
|
|
private $key;
|
|
|
|
|
2015-09-04 00:20:58 +02:00
|
|
|
/**
|
|
|
|
* @var int The number of total locks.
|
|
|
|
*/
|
|
|
|
private $maxLocks;
|
|
|
|
|
2015-08-11 05:27:10 +02:00
|
|
|
/**
|
2015-08-31 11:18:14 +02:00
|
|
|
* @var resource A message queue of available locks.
|
2015-08-11 05:27:10 +02:00
|
|
|
*/
|
2015-08-31 11:18:14 +02:00
|
|
|
private $queue;
|
2015-08-11 05:27:10 +02:00
|
|
|
|
|
|
|
/**
|
2015-08-31 00:52:00 +02:00
|
|
|
* Creates a new semaphore with a given number of locks.
|
2015-08-11 05:27:10 +02:00
|
|
|
*
|
2015-08-31 00:52:00 +02:00
|
|
|
* @param int $maxLocks The maximum number of locks that can be acquired from the semaphore.
|
|
|
|
* @param int $permissions Permissions to access the semaphore.
|
2015-08-31 11:18:14 +02:00
|
|
|
*
|
|
|
|
* @throws SemaphoreException If the semaphore could not be created due to an internal error.
|
2015-08-11 05:27:10 +02:00
|
|
|
*/
|
2015-08-31 00:52:00 +02:00
|
|
|
public function __construct($maxLocks, $permissions = 0600)
|
2015-08-11 05:27:10 +02:00
|
|
|
{
|
2015-09-04 01:31:29 +02:00
|
|
|
$this->init($maxLocks, $permissions);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param int $maxLocks The maximum number of locks that can be acquired from the semaphore.
|
|
|
|
* @param int $permissions Permissions to access the semaphore.
|
|
|
|
*
|
|
|
|
* @throws SemaphoreException If the semaphore could not be created due to an internal error.
|
|
|
|
*/
|
|
|
|
private function init($maxLocks, $permissions)
|
|
|
|
{
|
|
|
|
$maxLocks = (int) $maxLocks;
|
|
|
|
if ($maxLocks < 1) {
|
|
|
|
$maxLocks = 1;
|
2015-09-04 00:20:58 +02:00
|
|
|
}
|
|
|
|
|
2015-08-11 05:27:10 +02:00
|
|
|
$this->key = abs(crc32(spl_object_hash($this)));
|
2015-09-04 00:20:58 +02:00
|
|
|
$this->maxLocks = $maxLocks;
|
2015-08-11 05:27:10 +02:00
|
|
|
|
2015-08-31 11:18:14 +02:00
|
|
|
$this->queue = msg_get_queue($this->key, $permissions);
|
|
|
|
if (!$this->queue) {
|
2015-08-11 05:27:10 +02:00
|
|
|
throw new SemaphoreException('Failed to create the semaphore.');
|
|
|
|
}
|
|
|
|
|
2015-08-31 11:18:14 +02:00
|
|
|
// Fill the semaphore with locks.
|
|
|
|
while (--$maxLocks >= 0) {
|
|
|
|
$this->release();
|
|
|
|
}
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2015-08-31 11:18:14 +02:00
|
|
|
* Checks if the semaphore has been freed.
|
2015-08-11 05:27:10 +02:00
|
|
|
*
|
2015-08-31 11:18:14 +02:00
|
|
|
* @return bool True if the semaphore has been freed, otherwise false.
|
|
|
|
*/
|
|
|
|
public function isFreed()
|
|
|
|
{
|
|
|
|
return !is_resource($this->queue) || !msg_queue_exists($this->key);
|
|
|
|
}
|
|
|
|
|
2015-09-04 00:20:58 +02:00
|
|
|
/**
|
|
|
|
* Gets the maximum number of locks held by the semaphore.
|
|
|
|
*
|
|
|
|
* @return int The maximum number of locks held by the semaphore.
|
|
|
|
*/
|
|
|
|
public function getSize()
|
|
|
|
{
|
|
|
|
return $this->maxLocks;
|
|
|
|
}
|
|
|
|
|
2015-08-31 19:12:35 +02:00
|
|
|
/**
|
|
|
|
* Gets the access permissions of the semaphore.
|
|
|
|
*
|
|
|
|
* @return int A permissions mode.
|
|
|
|
*/
|
|
|
|
public function getPermissions()
|
|
|
|
{
|
|
|
|
$stat = msg_stat_queue($this->queue);
|
|
|
|
return $stat['msg_perm.mode'];
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Sets the access permissions of the semaphore.
|
|
|
|
*
|
|
|
|
* The current user must have access to the semaphore in order to change the permissions.
|
|
|
|
*
|
|
|
|
* @param int $mode A permissions mode to set.
|
|
|
|
*
|
|
|
|
* @throws SemaphoreException If the operation failed.
|
|
|
|
*/
|
|
|
|
public function setPermissions($mode)
|
|
|
|
{
|
|
|
|
if (!msg_set_queue($this->queue, [
|
|
|
|
'msg_perm.mode' => $mode
|
|
|
|
])) {
|
|
|
|
throw new SemaphoreException('Failed to change the semaphore permissions.');
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-08-31 11:18:14 +02:00
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
|
|
|
public function count()
|
|
|
|
{
|
|
|
|
$stat = msg_stat_queue($this->queue);
|
|
|
|
return $stat['msg_qnum'];
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
2015-08-11 05:27:10 +02:00
|
|
|
*/
|
|
|
|
public function acquire()
|
|
|
|
{
|
2015-09-04 06:23:12 +02:00
|
|
|
do {
|
2015-08-31 11:18:14 +02:00
|
|
|
// Attempt to acquire a lock from the semaphore.
|
|
|
|
if (@msg_receive($this->queue, 0, $type, 1, $chr, false, MSG_IPC_NOWAIT, $errno)) {
|
|
|
|
// A free lock was found, so resolve with a lock object that can
|
|
|
|
// be used to release the lock.
|
|
|
|
yield new Lock(function (Lock $lock) {
|
|
|
|
$this->release();
|
|
|
|
});
|
|
|
|
return;
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
|
2015-08-31 11:18:14 +02:00
|
|
|
// Check for unusual errors.
|
|
|
|
if ($errno !== MSG_ENOMSG) {
|
|
|
|
throw new SemaphoreException('Failed to acquire a lock.');
|
|
|
|
}
|
2015-09-04 06:23:12 +02:00
|
|
|
} while (yield Coroutine\sleep(self::LATENCY_TIMEOUT));
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Removes the semaphore if it still exists.
|
2015-08-31 11:18:14 +02:00
|
|
|
*
|
|
|
|
* @throws SemaphoreException If the operation failed.
|
2015-08-11 05:27:10 +02:00
|
|
|
*/
|
2015-08-31 11:18:14 +02:00
|
|
|
public function free()
|
2015-08-11 05:27:10 +02:00
|
|
|
{
|
2015-08-31 11:18:14 +02:00
|
|
|
if (is_resource($this->queue) && msg_queue_exists($this->key)) {
|
|
|
|
if (!msg_remove_queue($this->queue)) {
|
|
|
|
throw new SemaphoreException('Failed to free the semaphore.');
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
|
2015-08-31 19:48:07 +02:00
|
|
|
$this->queue = null;
|
2015-08-31 11:18:14 +02:00
|
|
|
}
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Serializes the semaphore.
|
|
|
|
*
|
|
|
|
* @return string The serialized semaphore.
|
|
|
|
*/
|
|
|
|
public function serialize()
|
|
|
|
{
|
2015-09-04 00:20:58 +02:00
|
|
|
return serialize([$this->key, $this->maxLocks]);
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Unserializes a serialized semaphore.
|
|
|
|
*
|
|
|
|
* @param string $serialized The serialized semaphore.
|
|
|
|
*/
|
|
|
|
public function unserialize($serialized)
|
|
|
|
{
|
2015-08-31 11:18:14 +02:00
|
|
|
// Get the semaphore key and attempt to re-connect to the semaphore in memory.
|
2015-09-04 00:20:58 +02:00
|
|
|
list($this->key, $this->maxLocks) = unserialize($serialized);
|
2015-08-11 05:27:10 +02:00
|
|
|
|
2015-08-31 11:18:14 +02:00
|
|
|
if (msg_queue_exists($this->key)) {
|
|
|
|
$this->queue = msg_get_queue($this->key);
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-09-04 00:20:58 +02:00
|
|
|
/**
|
|
|
|
* Clones the semaphore, creating a new semaphore with the same size and permissions.
|
|
|
|
*/
|
|
|
|
public function __clone()
|
|
|
|
{
|
2015-09-04 01:31:29 +02:00
|
|
|
$this->init($this->maxLocks, $this->getPermissions());
|
2015-09-04 00:20:58 +02:00
|
|
|
}
|
|
|
|
|
2015-08-11 05:27:10 +02:00
|
|
|
/**
|
2015-08-31 11:18:14 +02:00
|
|
|
* Releases a lock from the semaphore.
|
|
|
|
*
|
|
|
|
* @throws SemaphoreException If the operation failed.
|
2015-08-11 05:27:10 +02:00
|
|
|
*/
|
2015-08-31 11:18:14 +02:00
|
|
|
protected function release()
|
2015-08-11 05:27:10 +02:00
|
|
|
{
|
2015-08-31 18:42:30 +02:00
|
|
|
// Call send in non-blocking mode. If the call fails because the queue
|
|
|
|
// is full, then the number of locks configured is too large.
|
|
|
|
if (!@msg_send($this->queue, 1, "\0", false, false, $errno)) {
|
|
|
|
if ($errno === MSG_EAGAIN) {
|
|
|
|
throw new SemaphoreException('The semaphore size is larger than the system allows.');
|
|
|
|
}
|
|
|
|
|
2015-08-31 11:18:14 +02:00
|
|
|
throw new SemaphoreException('Failed to release the lock.');
|
2015-08-11 05:27:10 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|