1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Remove old POSIX semaphores & shared objects

This commit is contained in:
coderstephen 2015-08-31 04:19:49 -05:00
parent ec3f5621b7
commit 3c531d2d02
7 changed files with 21 additions and 641 deletions

View File

@ -1,280 +0,0 @@
<?php
namespace Icicle\Concurrent\Forking;
use Icicle\Concurrent\Exception\SynchronizedMemoryException;
/**
* A synchronized object that safely shares its state across processes and
* provides methods for process synchronization.
*
* When used with forking, the object must be created prior to forking for both
* processes to access the synchronized object.
*/
abstract class SharedObject
{
/**
* @var int The default amount of bytes to allocate for the object.
*/
const SHM_DEFAULT_SIZE = 16384;
/**
* @var int The byte offset to the start of the object data in memory.
*/
const SHM_DATA_OFFSET = 5;
/**
* @var int The default permissions for other processes to access the object.
*/
const OBJECT_PERMISSIONS = 0600;
/**
* @var The shared memory segment key.
*/
private $__key;
/**
* @var An open handle to the shared memory segment.
*/
private $__shm;
/**
* @var array A local cache of property values that are synchronized.
*/
private $__synchronizedProperties = [];
/**
* Creates a new synchronized object.
*/
public function __construct()
{
$this->__key = abs(crc32(spl_object_hash($this)));
$this->__open($this->__key, 'c', static::OBJECT_PERMISSIONS, static::SHM_DEFAULT_SIZE);
$this->__write(0, pack('x5'));
$this->__initSynchronizedProperties();
}
/**
* Destroys the synchronized object.
*/
public function destroy()
{
if (!shmop_delete($this->__shm)) {
throw new SynchronizedMemoryException('Failed to discard shared memory block.');
}
}
/**
* Checks if a synchronized property is set.
*
* @param string $name The name of the property to check.
*
* @return bool True if the property is set, otherwise false.
*
* @internal
*/
final public function __isset($name)
{
$this->__readSynchronizedProperties();
return isset($this->__synchronizedProperties[$name]);
}
/**
* Gets the value of a synchronized property.
*
* @param string $name The name of the property to get.
*
* @return mixed The value of the property.
*
* @internal
*/
final public function __get($name)
{
$this->__readSynchronizedProperties();
return $this->__synchronizedProperties[$name];
}
/**
* Sets the value of a synchronized property.
*
* @param string $name The name of the property to set.
* @param mixed $value The value to set the property to.
*
* @internal
*/
final public function __set($name, $value)
{
$this->__readSynchronizedProperties();
$this->__synchronizedProperties[$name] = $value;
$this->__writeSynchronizedProperties();
}
/**
* Unsets a synchronized property.
*
* @param string $name The name of the property to unset.
*
* @internal
*/
final public function __unset($name)
{
$this->__readSynchronizedProperties();
if (isset($this->__synchronizedProperties[$name])) {
unset($this->__synchronizedProperties[$name]);
$this->__writeSynchronizedProperties();
}
}
/**
* Initializes the internal synchronized property table.
*
* This method does some ugly hackery to put on a nice face elsewhere. At
* call-time, the descendant type's defined and inherited properties are
* scanned for \@synchronized annotations.
*
* @internal
*/
private function __initSynchronizedProperties()
{
$class = new \ReflectionClass(get_called_class());
$synchronizedProperties = [];
// Find *all* defined and inherited properties of the called class (late
// binding) and get which class the property was defined in. This
// includes inherited private properties.
do {
foreach ($class->getProperties() as $property) {
if (!$property->isStatic()) {
$comment = $property->getDocComment();
if ($comment && strpos($comment, '@synchronized') !== false) {
$synchronizedProperties[$property->getName()] = $class->getName();
}
}
}
} while ($class = $class->getParentClass());
// Define a closure that deletes a property and returns its default
// value. This function will be called on the current object to delete
// synchronized properties (by being bound to the class scope that
// defined the property).
$unsetter = function ($name) {
$initValue = $this->{$name};
unset($this->{$name});
return $initValue;
};
// Cache the synchronized property table.
foreach ($synchronizedProperties as $property => $class) {
$this->__synchronizedProperties[$property] = $unsetter
->bindTo($this, $class)
->__invoke($property);
}
}
/**
* Reloads the object's property table from shared memory.
*
* @internal
*/
private function __readSynchronizedProperties()
{
$data = $this->__read(0, static::SHM_DATA_OFFSET);
$header = unpack('Cstate/Lsize', $data);
// State set to 1 indicates the memory is stale and has been moved to a
// new location. Move handle and try to read again.
if ($header['state'] === 1) {
shmop_close($this->__shm);
$this->__key = $header['size'];
$this->__open($this->__key, 'w', 0, 0);
$this->__readSynchronizedProperties();
return;
}
if ($header['size'] > 0) {
$data = $this->__read(static::SHM_DATA_OFFSET, $header['size']);
$this->__synchronizedProperties = unserialize($data);
}
}
/**
* Writes the object's property table to shared memory.
*
* @internal
*/
protected function __writeSynchronizedProperties()
{
$serialized = serialize($this->__synchronizedProperties);
$size = strlen($serialized);
// If we run out of space, we need to allocate a new shared memory
// segment that is larger than the current one. To coordinate with other
// processes, we will leave a message in the old segment that the segment
// has moved and along with the new key. The old segment will be discarded
// automatically after all other processes notice the change and close
// the old handle.
if (shmop_size($this->__shm) < $size + static::SHM_DATA_OFFSET) {
$this->__key = $this->__key < 0xffffffff ? $this->__key + 1 : rand(0x10, 0xfffffffe);
$header = pack('CL', 1, $this->__key);
$this->__write(0, $header);
$this->destroy();
shmop_close($this->__shm);
$this->__open($this->__key, 'c', static::OBJECT_PERMISSIONS, $size * 2);
}
$data = pack('xLa*', $size, $serialized);
$this->__write(0, $data);
}
/**
* Opens a shared memory handle.
*
* @param int $key The shared memory key.
* @param string $mode The mode to open the shared memory in.
* @param int $permissions Process permissions on the shared memory.
* @param int $size The size to crate the shared memory in bytes.
*
* @internal
*/
private function __open($key, $mode, $permissions, $size)
{
$this->__shm = shmop_open($key, $mode, $permissions, $size);
if ($this->__shm === false) {
throw new SynchronizedMemoryException('Failed to create shared memory block.');
}
}
/**
* Reads binary data from shared memory.
*
* @param int $offset The offset to read from.
* @param int $size The number of bytes to read.
*
* @return string The binary data at the given offset.
*
* @internal
*/
private function __read($offset, $size)
{
$data = shmop_read($this->__shm, $offset, $size);
if ($data === false) {
throw new SynchronizedMemoryException('Failed to read from shared memory block.');
}
return $data;
}
/**
* Writes binary data to shared memory.
*
* @param int $offset The offset to write to.
* @param string $data The binary data to write.
*
* @internal
*/
private function __write($offset, $data)
{
if (!shmop_write($this->__shm, $data, $offset)) {
throw new SynchronizedMemoryException('Failed to write to shared memory block.');
}
}
}

View File

@ -1,76 +0,0 @@
<?php
namespace Icicle\Concurrent\Forking;
use Icicle\Concurrent\SynchronizableInterface;
use Icicle\Concurrent\Sync\AsyncSemaphore;
/**
* A synchronized object that safely shares its state across processes and
* provides methods for process synchronization.
*
* When used with forking, the object must be created prior to forking for both
* processes to access the synchronized object.
*/
class Synchronized extends SharedObject implements SynchronizableInterface
{
/**
* @var AsyncSemaphore A semaphore used for locking the object data.
*/
private $semaphore;
/**
* Creates a new synchronized object.
*/
public function __construct()
{
parent::__construct();
$this->semaphore = new AsyncSemaphore(1);
}
/**
* Locks the object for read or write for the calling context.
*/
public function lock()
{
return $this->semaphore->acquire();
}
/**
* Unlocks the object.
*/
public function unlock()
{
$this->__writeSynchronizedProperties();
return $this->semaphore->release();
}
/**
* Invokes a function while maintaining a lock for the calling context.
*
* @param callable $callback The function to invoke.
*
* @return mixed The value returned by the callback.
*/
public function synchronized(callable $callback)
{
return $this->lock()->then(function () use ($callback) {
try {
$returnValue = $callback($this);
} finally {
$this->unlock();
}
return $returnValue;
});
}
/**
* Destroys the synchronized object safely on destruction.
*/
public function __destruct()
{
/*$this->synchronized(function () {
$this->destroy();
});*/
}
}

View File

@ -1,168 +0,0 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Exception\SemaphoreException;
use Icicle\Concurrent\Forking\SharedObject;
use Icicle\Loop;
use Icicle\Promise;
/**
* An asynchronous semaphore with non-blocking lock requests.
*
* To keep in sync with all handles to the async semaphore, a synchronous
* semaphore is used as a gatekeeper to access the lock count; such locks are
* guaranteed to perform very few memory read or write operations to reduce the
* semaphore latency.
*/
class AsyncSemaphore extends SharedObject
{
/**
* @var int The number of available locks.
* @synchronized
*/
private $locks;
/**
* @var int The maximum number of locks the semaphore allows.
* @synchronized
*/
private $maxLocks;
/**
* @var int A queue of processes waiting on locks.
* @synchronized
*/
private $processQueue;
/**
* @var \SplQueue A queue of promises waiting to acquire a lock within the
* current calling context.
*/
private $waitQueue;
/**
* @var Semaphore A synchronous semaphore for double locking.
*/
private $semaphore;
/**
* Creates a new asynchronous semaphore.
*
* @param int $maxLocks The maximum number of processes that can lock the semaphore.
*/
public function __construct($maxLocks = 1)
{
parent::__construct();
if (!is_int($maxLocks) || $maxLocks < 1) {
throw new InvalidArgumentError('Max locks must be a positive integer.');
}
$this->locks = $maxLocks;
$this->maxLocks = $maxLocks;
$this->processQueue = new \SplQueue();
$this->waitQueue = new \SplQueue();
$this->semaphore = new Semaphore(1);
Loop\signal(SIGUSR1, function () {
$this->handlePendingLocks();
});
}
/**
* Acquires a lock from the semaphore.
*
* @return PromiseInterface A promise resolved when a lock has been acquired.
*
* If there are one or more locks available, the returned promise is resolved
* immediately and the lock count is decreased. If no locks are available,
* the semaphore waits asynchronously for an unlock signal from another
* process before resolving.
*/
public function acquire()
{
$deferred = new Promise\Deferred();
// Alright, we gotta get in and out as fast as possible. Deep breath...
$this->semaphore->acquire();
try {
if ($this->locks > 0) {
// Oh goody, a free lock! Acquire a lock and get outta here!
--$this->locks;
$deferred->resolve();
} else {
$this->waitQueue->enqueue($deferred);
$this->processQueue->enqueue(getmypid());
$this->__writeSynchronizedProperties();
}
} finally {
$this->semaphore->release();
}
return $deferred->getPromise();
}
/**
* Releases a lock to the semaphore.
*
* @return PromiseInterface A promise resolved when a lock has been released.
*
* Note that this function is near-atomic and returns almost immediately. A
* promise is returned only for consistency.
*/
public function release()
{
$this->semaphore->acquire();
if ($this->locks === $this->maxLocks) {
$this->semaphore->release();
throw new SemaphoreException('No locks acquired to release.');
}
++$this->locks;
if (!$this->processQueue->isEmpty()) {
$pid = $this->processQueue->dequeue();
$pending = true;
}
$this->semaphore->release();
if ($pending) {
if ($pid === getmypid()) {
$this->waitQueue->dequeue()->resolve();
} else {
posix_kill($pid, SIGUSR1);
}
}
return Promise\resolve();
}
/**
* Handles pending lock requests and resolves a pending acquire() call if
* new locks are available.
*/
private function handlePendingLocks()
{
$dequeue = false;
$this->semaphore->acquire();
if ($this->locks > 0 && !$this->waitQueue->isEmpty()) {
--$this->locks;
$dequeue = true;
}
$this->semaphore->release();
if ($dequeue) {
$this->waitQueue->dequeue()->resolve();
}
}
public function destroy()
{
parent::destroy();
$this->semaphore->destroy();
}
}

View File

@ -17,6 +17,9 @@ use Icicle\Concurrent\Exception\SharedMemoryException;
*
* Note that accessing a shared object is not atomic. Access to a shared object
* should be protected with a mutex to preserve data integrity.
*
* When used with forking, the object must be created prior to forking for both
* processes to access the synchronized object.
*/
class Parcel implements ParcelInterface, \Serializable
{

View File

@ -1,111 +0,0 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Concurrent\Exception\SemaphoreException;
/**
* A synchronous semaphore that uses System V IPC semaphores.
*/
class Semaphore implements SemaphoreInterface, \Serializable
{
/**
* @var int The key to the semaphore.
*/
private $key;
/**
* @var int The maximum number of locks.
*/
private $maxLocks;
/**
* @var resource An open handle to the semaphore.
*/
private $handle;
/**
* Creates a new semaphore with a given number of locks.
*
* @param int $maxLocks The maximum number of locks that can be acquired from the semaphore.
* @param int $permissions Permissions to access the semaphore.
*/
public function __construct($maxLocks = 1, $permissions = 0600)
{
$this->key = abs(crc32(spl_object_hash($this)));
$this->maxLocks = $maxLocks;
$this->handle = sem_get($this->key, $maxLocks, $permissions, 1);
if ($this->handle === false) {
throw new SemaphoreException('Failed to create the semaphore.');
}
}
/**
* Gets the maximum number of locks.
*
* @return int The maximum number of locks.
*/
public function getMaxLocks()
{
return $this->maxLocks;
}
/**
* Acquires a lock from the semaphore.
*
* Blocks until a lock can be acquired.
*/
public function acquire()
{
if (!sem_acquire($this->handle)) {
throw new SemaphoreException('Failed to lock the semaphore.');
}
}
/**
* Releases a lock to the semaphore.
*/
public function release()
{
if (!sem_release($this->handle)) {
throw new SemaphoreException('Failed to unlock the semaphore.');
}
}
/**
* Removes the semaphore if it still exists.
*/
public function destroy()
{
if (!@sem_remove($this->handle)) {
$error = error_get_last();
if ($error['type'] !== E_WARNING) {
throw new SemaphoreException('Failed to remove the semaphore.');
}
}
}
/**
* Serializes the semaphore.
*
* @return string The serialized semaphore.
*/
public function serialize()
{
return serialize([$this->key, $this->maxLocks]);
}
/**
* Unserializes a serialized semaphore.
*
* @param string $serialized The serialized semaphore.
*/
public function unserialize($serialized)
{
// Get the semaphore key and attempt to re-connect to the semaphore in
// memory.
list($this->key, $this->maxLocks) = unserialize($serialized);
$this->handle = sem_get($this->key, $maxLocks, 0600, 1);
}
}

View File

@ -2,19 +2,29 @@
namespace Icicle\Concurrent\Sync;
/**
* A counting semaphore interface.
* A non-blocking counting semaphore.
*
* Objects that implement this interface should guarantee that all operations
* are atomic.
*/
interface SemaphoreInterface
interface SemaphoreInterface extends \Countable
{
/**
* Gets the number of currently available locks.
*
* @return int The number of available locks.
*/
public function count();
/**
* @coroutine
*
* Acquires a lock from the semaphore asynchronously.
*
* @return \Generator
* If there are one or more locks available, this function resolve imsmediately with a lock and the lock count is
* decreased. If no locks are available, the semaphore waits asynchronously for a lock to become available.
*
* @return \Generator Resolves with a lock object when the acquire is successful.
*
* @resolve \Icicle\Concurrent\Sync\Lock
*/

View File

@ -2,7 +2,7 @@
namespace Icicle\Concurrent;
/**
* Interface for objects that can be synchronized across contexts.
* An object that can be synchronized for exclusive access across contexts.
*/
interface SynchronizableInterface
{
@ -11,11 +11,13 @@ interface SynchronizableInterface
*
* Invokes a function while maintaining a lock on the object.
*
* @param callable $callback The function to invoke.
* The given callback will be passed the object being synchronized on as the first argument.
*
* @param callable<self> $callback The synchronized function to invoke.
*
* @return \Generator
*
* @resolve mixed Return value of $callback.
* @resolve mixed The return value of $callback.
*/
public function synchronized(callable $callback);
}