From 6f185ad17de72c8b3367aa0e999451b951a39a5c Mon Sep 17 00:00:00 2001 From: coderstephen Date: Sun, 12 Jul 2015 15:57:04 -0500 Subject: [PATCH] Handle signals inside AsyncSemaphore --- examples/fork.php | 64 +++++++++++++++++++++++++++---------- src/AsyncSemaphore.php | 16 +++++++--- src/Forking/ForkContext.php | 21 ++++++------ 3 files changed, 67 insertions(+), 34 deletions(-) diff --git a/examples/fork.php b/examples/fork.php index 9d6bc2c..d8da4fe 100644 --- a/examples/fork.php +++ b/examples/fork.php @@ -1,41 +1,71 @@ sem->acquire(); + sleep(4); + yield $this->sem->release(); $this->synchronized(function () { $this->data = 'progress'; }); + //throw new Exception('Testing exception bubbling.'); + sleep(2); } } -$context = new Test(); -$context->data = 'blank'; -$context->start(); +$generator = function () { + $before = memory_get_usage(); + $context = new Test(); + $after = memory_get_usage(); + $context->data = 'blank'; + printf("Object memory: %d bytes\n", $after - $before); + $context->start(); -$timer = Loop\periodic(1, function () use ($context) { - static $i; - $i = $i + 1 ?: 1; - print "Demonstrating how alive the parent is for the {$i}th time.\n"; - - $context->synchronized(function ($context) { - printf("Context data: '%s'\n", $context->data); + Loop\timer(1, function () use ($context) { + $context->sem->acquire()->then(function () use ($context) { + print "Finally got semaphore from child!\n"; + return $context->sem->release(); + }); }); -}); -$context->join()->then(function () use ($timer) { - print "Context done!\n"; - $timer->stop(); -}); + $timer = Loop\periodic(1, function () use ($context) { + static $i; + $i = $i + 1 ?: 1; + print "Demonstrating how alive the parent is for the {$i}th time.\n"; + if ($context->isRunning()) { + $context->synchronized(function ($context) { + printf("Context data: '%s'\n", $context->data); + }); + } + }); + + try { + yield $context->join(); + print "Context done!\n"; + } catch (Exception $e) { + print "Error from child!\n"; + print $e."\n"; + } finally { + $timer->stop(); + } +}; + +new Coroutine($generator()); Loop\run(); diff --git a/src/AsyncSemaphore.php b/src/AsyncSemaphore.php index 3912bd5..16cbde8 100644 --- a/src/AsyncSemaphore.php +++ b/src/AsyncSemaphore.php @@ -3,6 +3,7 @@ namespace Icicle\Concurrent; use Icicle\Concurrent\Exception\InvalidArgumentError; use Icicle\Concurrent\Forking\Synchronized; +use Icicle\Loop; use Icicle\Promise; /** @@ -59,6 +60,10 @@ class AsyncSemaphore extends Synchronized $this->queueSize = 0; $this->locks = $maxLocks; $this->processQueue = new \SplQueue(); + + Loop\signal(SIGUSR1, function () { + $this->handlePendingLocks(); + }); } /** @@ -77,7 +82,6 @@ class AsyncSemaphore extends Synchronized // Alright, we gotta get in and out as fast as possible. Deep breath... return $this->synchronized(function () { if ($this->locks > 0) { - printf("Async lock count: %d--\n", $this->locks); // Oh goody, a free lock! Acquire a lock and get outta here! --$this->locks; return Promise\resolve(); @@ -102,10 +106,9 @@ class AsyncSemaphore extends Synchronized { $this->synchronized(function () { if ($this->locks === $this->maxLocks) { - throw new \Exception(); + throw new SemaphoreException('No locks acquired to release.'); } - printf("Async lock count: %d++\n", $this->locks); ++$this->locks; }); @@ -122,13 +125,16 @@ class AsyncSemaphore extends Synchronized return Promise\resolve(); } - public function update() + /** + * Handles pending lock requests and resolves a pending acquire() call if + * new locks are available. + */ + private function handlePendingLocks() { $dequeue = false; $this->synchronized(function () use (&$dequeue) { if ($this->locks > 0 && !$this->waitQueue->isEmpty()) { - printf("Async lock count: %d--\n", $this->locks); --$this->locks; $dequeue = true; } diff --git a/src/Forking/ForkContext.php b/src/Forking/ForkContext.php index ae0a047..8c9b3f9 100644 --- a/src/Forking/ForkContext.php +++ b/src/Forking/ForkContext.php @@ -1,8 +1,10 @@ stop(); }); - $this->sem = new AsyncIpcSemaphore(); + $this->sem = new AsyncSemaphore(); } /** @@ -80,10 +82,6 @@ abstract class ForkContext extends Synchronized implements ContextInterface $this->pid = $pid; fclose($this->childSocket); - Loop\signal(SIGUSR1, function () { - $this->sem->update(); - }); - // Wait for the child process to send us a byte over the socket pair // to discover immediately when the process has completed. $this->parentSocket->read(1)->then(function ($data) { @@ -120,18 +118,17 @@ abstract class ForkContext extends Synchronized implements ContextInterface // child context by default is synchronous and uses the parent event // loop, so we need to stop the clone before doing any work in case it // is already running. + Loop\stop(); Loop\reInit(); Loop\clear(); - Loop\stop(); - - pcntl_signal(SIGUSR1, function () { - $this->sem->update(); - }); // Execute the context runnable and send the parent context the result. try { - $this->run(); - pcntl_signal_dispatch(); + $generator = $this->run(); + if ($generator instanceof \Generator) { + $coroutine = new Coroutine($generator); + } + Loop\run(); fwrite($this->childSocket, chr(self::MSG_DONE)); } catch (\Exception $exception) { fwrite($this->childSocket, chr(self::MSG_ERROR));