1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Handle signals inside AsyncSemaphore

This commit is contained in:
coderstephen 2015-07-12 15:57:04 -05:00
parent 24a1e2fc8f
commit 6f185ad17d
3 changed files with 67 additions and 34 deletions

View File

@ -1,41 +1,71 @@
<?php <?php
require dirname(__DIR__).'/vendor/autoload.php'; require dirname(__DIR__).'/vendor/autoload.php';
use Icicle\Loop;
use Icicle\Concurrent\Forking\ForkContext; use Icicle\Concurrent\Forking\ForkContext;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
class Test extends ForkContext class Test extends ForkContext
{ {
/**
* @synchronized
*/
public $data;
public function run() public function run()
{ {
print "Child sleeping for 5 seconds...\n"; print "Child sleeping for 5 seconds...\n";
sleep(3); yield $this->sem->acquire();
sleep(4);
yield $this->sem->release();
$this->synchronized(function () { $this->synchronized(function () {
$this->data = 'progress'; $this->data = 'progress';
}); });
//throw new Exception('Testing exception bubbling.');
sleep(2); sleep(2);
} }
} }
$context = new Test(); $generator = function () {
$context->data = 'blank'; $before = memory_get_usage();
$context->start(); $context = new Test();
$after = memory_get_usage();
$context->data = 'blank';
printf("Object memory: %d bytes\n", $after - $before);
$context->start();
$timer = Loop\periodic(1, function () use ($context) { Loop\timer(1, function () use ($context) {
static $i; $context->sem->acquire()->then(function () use ($context) {
$i = $i + 1 ?: 1; print "Finally got semaphore from child!\n";
print "Demonstrating how alive the parent is for the {$i}th time.\n"; return $context->sem->release();
});
$context->synchronized(function ($context) {
printf("Context data: '%s'\n", $context->data);
}); });
});
$context->join()->then(function () use ($timer) { $timer = Loop\periodic(1, function () use ($context) {
print "Context done!\n"; static $i;
$timer->stop(); $i = $i + 1 ?: 1;
}); print "Demonstrating how alive the parent is for the {$i}th time.\n";
if ($context->isRunning()) {
$context->synchronized(function ($context) {
printf("Context data: '%s'\n", $context->data);
});
}
});
try {
yield $context->join();
print "Context done!\n";
} catch (Exception $e) {
print "Error from child!\n";
print $e."\n";
} finally {
$timer->stop();
}
};
new Coroutine($generator());
Loop\run(); Loop\run();

View File

@ -3,6 +3,7 @@ namespace Icicle\Concurrent;
use Icicle\Concurrent\Exception\InvalidArgumentError; use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Forking\Synchronized; use Icicle\Concurrent\Forking\Synchronized;
use Icicle\Loop;
use Icicle\Promise; use Icicle\Promise;
/** /**
@ -59,6 +60,10 @@ class AsyncSemaphore extends Synchronized
$this->queueSize = 0; $this->queueSize = 0;
$this->locks = $maxLocks; $this->locks = $maxLocks;
$this->processQueue = new \SplQueue(); $this->processQueue = new \SplQueue();
Loop\signal(SIGUSR1, function () {
$this->handlePendingLocks();
});
} }
/** /**
@ -77,7 +82,6 @@ class AsyncSemaphore extends Synchronized
// Alright, we gotta get in and out as fast as possible. Deep breath... // Alright, we gotta get in and out as fast as possible. Deep breath...
return $this->synchronized(function () { return $this->synchronized(function () {
if ($this->locks > 0) { if ($this->locks > 0) {
printf("Async lock count: %d--\n", $this->locks);
// Oh goody, a free lock! Acquire a lock and get outta here! // Oh goody, a free lock! Acquire a lock and get outta here!
--$this->locks; --$this->locks;
return Promise\resolve(); return Promise\resolve();
@ -102,10 +106,9 @@ class AsyncSemaphore extends Synchronized
{ {
$this->synchronized(function () { $this->synchronized(function () {
if ($this->locks === $this->maxLocks) { if ($this->locks === $this->maxLocks) {
throw new \Exception(); throw new SemaphoreException('No locks acquired to release.');
} }
printf("Async lock count: %d++\n", $this->locks);
++$this->locks; ++$this->locks;
}); });
@ -122,13 +125,16 @@ class AsyncSemaphore extends Synchronized
return Promise\resolve(); return Promise\resolve();
} }
public function update() /**
* Handles pending lock requests and resolves a pending acquire() call if
* new locks are available.
*/
private function handlePendingLocks()
{ {
$dequeue = false; $dequeue = false;
$this->synchronized(function () use (&$dequeue) { $this->synchronized(function () use (&$dequeue) {
if ($this->locks > 0 && !$this->waitQueue->isEmpty()) { if ($this->locks > 0 && !$this->waitQueue->isEmpty()) {
printf("Async lock count: %d--\n", $this->locks);
--$this->locks; --$this->locks;
$dequeue = true; $dequeue = true;
} }

View File

@ -1,8 +1,10 @@
<?php <?php
namespace Icicle\Concurrent\Forking; namespace Icicle\Concurrent\Forking;
use Icicle\Concurrent\AsyncSemaphore;
use Icicle\Concurrent\ContextInterface; use Icicle\Concurrent\ContextInterface;
use Icicle\Concurrent\Exception\ContextAbortException; use Icicle\Concurrent\Exception\ContextAbortException;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop; use Icicle\Loop;
use Icicle\Promise\Deferred; use Icicle\Promise\Deferred;
use Icicle\Socket\Stream\DuplexStream; use Icicle\Socket\Stream\DuplexStream;
@ -33,7 +35,7 @@ abstract class ForkContext extends Synchronized implements ContextInterface
$this->stop(); $this->stop();
}); });
$this->sem = new AsyncIpcSemaphore(); $this->sem = new AsyncSemaphore();
} }
/** /**
@ -80,10 +82,6 @@ abstract class ForkContext extends Synchronized implements ContextInterface
$this->pid = $pid; $this->pid = $pid;
fclose($this->childSocket); fclose($this->childSocket);
Loop\signal(SIGUSR1, function () {
$this->sem->update();
});
// Wait for the child process to send us a byte over the socket pair // Wait for the child process to send us a byte over the socket pair
// to discover immediately when the process has completed. // to discover immediately when the process has completed.
$this->parentSocket->read(1)->then(function ($data) { $this->parentSocket->read(1)->then(function ($data) {
@ -120,18 +118,17 @@ abstract class ForkContext extends Synchronized implements ContextInterface
// child context by default is synchronous and uses the parent event // child context by default is synchronous and uses the parent event
// loop, so we need to stop the clone before doing any work in case it // loop, so we need to stop the clone before doing any work in case it
// is already running. // is already running.
Loop\stop();
Loop\reInit(); Loop\reInit();
Loop\clear(); Loop\clear();
Loop\stop();
pcntl_signal(SIGUSR1, function () {
$this->sem->update();
});
// Execute the context runnable and send the parent context the result. // Execute the context runnable and send the parent context the result.
try { try {
$this->run(); $generator = $this->run();
pcntl_signal_dispatch(); if ($generator instanceof \Generator) {
$coroutine = new Coroutine($generator);
}
Loop\run();
fwrite($this->childSocket, chr(self::MSG_DONE)); fwrite($this->childSocket, chr(self::MSG_DONE));
} catch (\Exception $exception) { } catch (\Exception $exception) {
fwrite($this->childSocket, chr(self::MSG_ERROR)); fwrite($this->childSocket, chr(self::MSG_ERROR));