1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Synchronize using shmop and docblock annotations

This commit is contained in:
coderstephen 2015-07-11 05:01:36 -05:00
parent fefa5e706a
commit 277e5ded02
2 changed files with 177 additions and 36 deletions

View File

@ -2,6 +2,7 @@
namespace Icicle\Concurrent;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Forking\Synchronized;
use Icicle\Promise;
/**
@ -20,6 +21,26 @@ class AsyncSemaphore extends Synchronized
*/
private $waitQueue;
/**
* @synchronized
*/
private $maxLocks;
/**
* @synchronized
*/
private $queueSize;
/**
* @synchronized
*/
private $locks;
/**
* @synchronized
*/
private $processQueue;
/**
* Creates a new asynchronous semaphore.
*
@ -64,7 +85,6 @@ class AsyncSemaphore extends Synchronized
$deferred = new Promise\Deferred();
$this->waitQueue->enqueue($deferred);
$this->processQueue->enqueue(getmypid());
var_dump($this->processQueue);
return $deferred->getPromise();
}
});
@ -89,7 +109,6 @@ class AsyncSemaphore extends Synchronized
++$this->locks;
});
var_dump($this->processQueue);
if (!$this->processQueue->isEmpty()) {
$pid = $this->processQueue->dequeue();

View File

@ -13,8 +13,9 @@ use Icicle\Concurrent\Exception\SynchronizedMemoryException;
*/
abstract class Synchronized
{
private $memoryBlock;
private $memoryKey;
private $__key;
private $__shm;
private $__synchronizedProperties = [];
protected $semaphore;
/**
@ -22,12 +23,11 @@ abstract class Synchronized
*/
public function __construct()
{
$this->__key = abs(crc32(spl_object_hash($this)));
$this->__open($this->__key, 'c', 0600, 1024);
$this->__write(0, pack('x5'));
$this->__initSynchronizedProperties();
$this->semaphore = new Semaphore();
$this->memoryKey = abs(crc32(spl_object_hash($this)));
$this->memoryBlock = shm_attach($this->memoryKey, 8192);
if (!is_resource($this->memoryBlock)) {
throw new SynchronizedMemoryException('Failed to create shared memory block.');
}
}
/**
@ -43,6 +43,7 @@ abstract class Synchronized
*/
public function unlock()
{
$this->__writeSynchronizedProperties();
$this->semaphore->release();
}
@ -67,17 +68,23 @@ abstract class Synchronized
}
/**
* Destroys the synchronized object safely.
* Destroys the synchronized object.
*/
protected function destroy()
{
if (!shmop_delete($this->__shm)) {
throw new SynchronizedMemoryException('Failed to discard shared memory block.');
}
}
/**
* Destroys the synchronized object safely on destruction.
*/
public function __destruct()
{
if (is_resource($this->memoryBlock)) {
$this->synchronized(function () {
if (!shm_remove($this->memoryBlock)) {
throw new SynchronizedMemoryException('Failed to discard shared memory block.');
}
});
}
$this->synchronized(function () {
$this->destroy();
});
}
/**
@ -85,8 +92,8 @@ abstract class Synchronized
*/
public function __isset($name)
{
$key = abs(crc32($name));
return shm_has_var($this->memoryBlock, $key);
$this->__readSynchronizedProperties();
return isset($this->__synchronizedProperties[$name]);
}
/**
@ -94,16 +101,8 @@ abstract class Synchronized
*/
public function __get($name)
{
$key = abs(crc32($name));
if (shm_has_var($this->memoryBlock, $key)) {
$serialized = shm_get_var($this->memoryBlock, $key);
if ($serialized === false) {
throw new SynchronizedMemoryException('Failed to read from shared memory block.');
}
return unserialize($serialized);
}
$this->__readSynchronizedProperties();
return $this->__synchronizedProperties[$name];
}
/**
@ -111,10 +110,9 @@ abstract class Synchronized
*/
public function __set($name, $value)
{
$key = abs(crc32($name));
if (!shm_put_var($this->memoryBlock, $key, serialize($value))) {
throw new SynchronizedMemoryException('Failed to write to shared memory block.');
}
$this->__readSynchronizedProperties();
$this->__synchronizedProperties[$name] = $value;
$this->__writeSynchronizedProperties();
}
/**
@ -122,9 +120,133 @@ abstract class Synchronized
*/
public function __unset($name)
{
$key = abs(crc32($name));
if (!shm_remove_var($this->memoryBlock, $key)) {
throw new SynchronizedMemoryException('Failed to erase data in shared memory block.');
$this->__readSynchronizedProperties();
if (isset($this->__synchronizedProperties[$name])) {
unset($this->__synchronizedProperties[$name]);
$this->__writeSynchronizedProperties();
}
}
/**
* Initializes the internal synchronized property table.
*
* This method does some ugly hackery to put on a nice face elsewhere. At
* call-time, the descendant type's defined and inherited properties are
* scanned for \@synchronized annotations.
*
* @internal
*/
private function __initSynchronizedProperties()
{
$class = new \ReflectionClass(get_called_class());
$synchronizedProperties = [];
do {
foreach ($class->getProperties() as $property) {
if (!$property->isStatic()) {
$comment = $property->getDocComment();
if ($comment && strpos($comment, '@synchronized') !== false) {
$synchronizedProperties[$property->getName()] = $class->getName();
}
}
}
} while ($class = $class->getParentClass());
$unsetter = function ($name) {
$initValue = $this->{$name};
unset($this->{$name});
return $initValue;
};
foreach ($synchronizedProperties as $property => $class) {
$this->__synchronizedProperties[$property] = $unsetter
->bindTo($this, $class)
->__invoke($property);
}
}
/**
* @internal
*/
private function __readSynchronizedProperties()
{
$data = $this->__read(0, 5);
$header = unpack('Cstate/Lsize', $data);
// State set to 1 indicates the memory is stale and has been moved to a
// new location. Move handle and try to read again.
if ($header['state'] === 1) {
shmop_close($this->__shm);
$this->__key = $header['size'];
$this->__open($this->__key, 'w', 0, 0);
$this->__readSynchronizedProperties();
return;
}
if ($header['size'] > 0) {
$data = $this->__read(5, $header['size']);
$this->__synchronizedProperties = unserialize($data);
}
}
/**
* @internal
*/
private function __writeSynchronizedProperties()
{
$serialized = serialize($this->__synchronizedProperties);
$size = strlen($serialized);
// If we run out of space, we need to allocate a new shared memory
// segment that is larger than the current one. To coordinate with other
// processes, we will leave a message in the old segment that the segment
// has moved and along with the new key. The old segment will be discarded
// automatically after all other processes notice the change and close
// the old handle.
if (shmop_size($this->__shm) < $size + 5) {
$this->__key = $this->__key < 0xffffffff ? $this->__key + 1 : rand(0x10, 0xfffffffe);
$header = pack('CL', 1, $this->__key);
$this->__write(0, $header);
$this->destroy();
shmop_close($this->__shm);
$this->__open($this->__key, 'c', 0600, $size * 2);
}
$data = pack('xLa*', $size, $serialized);
$this->__write(0, $data);
}
/**
* @internal
*/
private function __open($key, $mode, $permissions, $size)
{
$this->__shm = shmop_open($key, $mode, $permissions, $size);
if ($this->__shm === false) {
throw new SynchronizedMemoryException('Failed to create shared memory block.');
}
}
/**
* @internal
*/
private function __read($offset, $size)
{
$data = shmop_read($this->__shm, $offset, $size);
if ($data === false) {
throw new SynchronizedMemoryException('Failed to read from shared memory block.');
}
return $data;
}
/**
* @internal
*/
private function __write($offset, $data)
{
if (!shmop_write($this->__shm, $data, $offset)) {
throw new SynchronizedMemoryException('Failed to write to shared memory block.');
}
}
}