1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 22:11:11 +01:00

Update fork context to support synchronization and asynchronous join

This commit is contained in:
coderstephen 2015-07-10 02:23:08 -05:00
parent 3a30fa1d4c
commit 27549c5c9c
3 changed files with 184 additions and 42 deletions

View File

@ -1,30 +1,40 @@
<?php
require dirname(__DIR__).'/vendor/autoload.php';
use Icicle\Loop;
use Icicle\Concurrent\Forking\ForkContext;
class Test extends ForkContext
{
public function run()
{
print "Exiting in 5 seconds...\n";
sleep(5);
print "Context exiting...\n";
print "Child sleeping for 5 seconds...\n";
sleep(3);
$this->synchronized(function () {
$this->data = 'progress';
});
sleep(2);
}
}
$context = new Test();
$context->start()->then(function () {
print "Context finished!\n";
Icicle\Loop\stop();
$context->data = 'blank';
$context->start();
$context->join()->then(function () {
print "Context done!\n";
Loop\stop();
});
print "Context started.\n";
Icicle\Loop\periodic(1, function () {
Loop\periodic(1, function () use ($context) {
static $i;
$i = $i + 1 ?: 1;
print "Demonstrating how alive the parent is for the {$i}th time.\n";
$context->synchronized(function ($context) {
printf("Context data: '%s'\n", $context->data);
});
});
Icicle\Loop\run();
Loop\run();

View File

@ -1,23 +1,54 @@
<?php
namespace Icicle\Concurrent\Forking;
use Icicle\Loop;
use Icicle\Concurrent\Context;
use Icicle\Socket\Stream\DuplexStream;
use Icicle\Concurrent\ContextAbortException;
use Icicle\Concurrent\Semaphore;
use Icicle\Loop;
use Icicle\Promise\Deferred;
use Icicle\Socket\Stream\DuplexStream;
abstract class ForkContext implements Context
/**
* Implements a UNIX-compatible context using forked processes.
*/
abstract class ForkContext extends Synchronizable implements Context
{
const MSG_DONE = 1;
const MSG_ERROR = 2;
private $parentSocket;
private $childSocket;
private $pid = 0;
private $isChild = false;
private $deferred;
private $semaphore;
/**
* Creates a new fork context.
*/
public function __construct()
{
parent::__construct();
$this->deferred = new Deferred(function (\Exception $exception) {
$this->stop();
});
$this->semaphore = new Semaphore();
}
/**
* Gets the forked process's process ID.
*
* @return int The process ID.
*/
public function getPid()
{
return $this->pid;
}
/**
* {@inheritdoc}
*/
public function isRunning()
{
if (!$this->isChild) {
@ -27,17 +58,11 @@ abstract class ForkContext implements Context
return true;
}
public function join()
{
pcntl_waitpid($this->pid, $status);
}
/**
* {@inheritdoc}
*/
public function start()
{
$deferred = new Deferred(function (\Exception $exception) {
$this->stop();
});
if (($fd = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) === false) {
throw new \Exception();
}
@ -51,19 +76,42 @@ abstract class ForkContext implements Context
Loop\reInit();
// We are the parent, so create a server socket.
if ($pid !== 0) {
// We are the parent, so close the child socket.
$this->pid = $pid;
$this->parentSocket->read(0, "\n")->then(function ($data) use ($deferred) {
print "Got data from worker: $data\n";
$deferred->resolve();
}, function (\Exception $exception) use ($deferred) {
$deferred->reject($exception);
fclose($this->childSocket);
// Wait for the child process to send us a byte over the socket pair
// to discover immediately when the process has completed.
$this->parentSocket->read(1)->then(function ($data) {
$message = ord($data);
if ($message === self::MSG_DONE) {
$this->deferred->resolve();
return;
}
// Get the fatal exception from the process.
return $this->parentSocket->read(2)->then(function ($data) {
list($serializedLength) = unpack('S', $data);
return $this->parentSocket->read($serializedLength);
})->then(function ($data) {
$previous = unserialize($data);
$exception = new ContextAbortException('The context encountered an error.', 0, $previous);
$this->deferred->reject($exception);
$this->parentSocket->close();
});
}, function (\Exception $exception) {
$this->deferred->reject($exception);
});
return $deferred->getPromise();
return;
}
// We are the child, so close the parent socket and initialize child values.
$this->isChild = true;
$this->pid = getmypid();
$this->parentSocket->close();
// We will have a cloned event loop from the parent after forking. The
// child context by default is synchronous and uses the parent event
// loop, so we need to stop the clone before doing any work in case it
@ -71,21 +119,19 @@ abstract class ForkContext implements Context
Loop\clear();
Loop\stop();
$this->pid = getmypid();
// Execute the context runnable and send the parent context the result.
try {
// We are the child, so begin working.
$this->run();
// Let the parent context now that we are done by sending some data.
fwrite($this->childSocket, 'done');
} catch (\Throwable $e) {
fwrite($this->childSocket, 'error');
fwrite($this->childSocket, chr(self::MSG_DONE));
} catch (\Exception $exception) {
fwrite($this->childSocket, chr(self::MSG_ERROR));
$serialized = serialize($exception);
$length = strlen($serialized);
fwrite($this->childSocket, pack('S', $length).$serialized);
} finally {
fclose($this->childSocket);
exit(0);
}
fwrite($this->childSocket, 'done');
fclose($this->childSocket);
exit(0);
}
public function stop()
@ -104,17 +150,46 @@ abstract class ForkContext implements Context
}
}
/**
* {@inheritdoc}
*/
public function join()
{
if ($this->isChild) {
throw new \Exception();
}
return $this->deferred->getPromise();
}
/**
* {@inheritdoc}
*/
public function lock()
{
$this->semaphore->lock();
}
/**
* {@inheritdoc}
*/
public function unlock()
{
$this->semaphore->unlock();
}
public function synchronize(callable $callback)
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback)
{
$this->lock();
$callback($this);
$this->unlock();
}
/**
* {@inheritdoc}
*/
abstract public function run();
}

View File

@ -0,0 +1,57 @@
<?php
namespace Icicle\Concurrent\Forking;
abstract class Synchronizable
{
private $memoryBlock;
private $memoryKey;
public function __construct()
{
$this->memoryKey = abs(crc32(spl_object_hash($this)));
$this->memoryBlock = shm_attach($this->memoryKey, 8192);
if (!is_resource($this->memoryBlock)) {
throw new \Exception();
}
}
public function __isset($name)
{
$key = abs(crc32($name));
return shm_has_var($this->memoryBlock, $key);
}
public function __get($name)
{
$key = abs(crc32($name));
if (shm_has_var($this->memoryBlock, $key)) {
$serialized = shm_get_var($this->memoryBlock, $key);
return unserialize($serialized);
}
}
public function __set($name, $value)
{
$key = abs(crc32($name));
if (!shm_put_var($this->memoryBlock, $key, serialize($value))) {
throw new \Exception();
}
}
public function __unset($name)
{
$key = abs(crc32($name));
if (!shm_remove_var($this->memoryBlock, $key)) {
throw new \Exception();
}
}
public function __destruct()
{
if ($this->memoryBlock) {
if (!shm_remove($this->memoryBlock)) {
throw new \Exception();
}
}
}
}