1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Experimental closure-based contexts

This commit is contained in:
coderstephen 2015-07-26 17:53:00 -05:00
parent 966b73a74c
commit 92432ff58b
6 changed files with 142 additions and 91 deletions

View File

@ -5,48 +5,20 @@ use Icicle\Concurrent\Forking\ForkContext;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
class Test extends ForkContext
{
/**
* @synchronized
*/
public $data;
public function run()
{
$generator = function () {
$context = new ForkContext(function () {
print "Child sleeping for 4 seconds...\n";
sleep(4);
yield $this->synchronized(function () {
$this->data = 'progress';
});
print "Child sleeping for 2 seconds...\n";
sleep(2);
}
}
$generator = function () {
$context = new Test();
$context->data = 'blank';
});
$context->start();
Loop\timer(1, function () use ($context) {
$context->synchronized(function ($context) {
print "Finally got lock from child!\n";
});
});
$timer = Loop\periodic(1, function () use ($context) {
static $i;
$i = $i + 1 ?: 1;
print "Demonstrating how alive the parent is for the {$i}th time.\n";
if ($context->isRunning()) {
$context->synchronized(function ($context) {
printf("Context data: '%s'\n", $context->data);
});
}
});
try {

View File

@ -4,22 +4,20 @@ require dirname(__DIR__).'/vendor/autoload.php';
use Icicle\Concurrent\Threading\ThreadContext;
use Icicle\Loop;
class Test extends ThreadContext
{
public function run()
{
print "Sleeping for 5 seconds...\n";
sleep(5);
}
}
// Create a periodic message in the main thread.
$timer = Loop\periodic(1, function () {
print "Demonstrating how alive the parent is.\n";
});
$test = new Test();
// Create a new child thread that does some blocking stuff.
$test = new ThreadContext(function () {
print "Sleeping for 5 seconds...\n";
sleep(5);
});
// Run the thread and wait asynchronously for it to finish.
$test->start();
$test->join()->then(function () {
$test->join()->then(function () use ($test) {
print "Thread ended!\n";
Loop\stop();
});

View File

@ -6,6 +6,13 @@ namespace Icicle\Concurrent;
*/
interface ContextInterface extends SynchronizableInterface
{
/**
* Creates a new context with a given function to run.
*
* @return ContextInterface A context instance.
*/
//public static function create(callable $function);
/**
* Checks if the context is running.
*
@ -35,9 +42,4 @@ interface ContextInterface extends SynchronizableInterface
* @return \Icicle\Promise\PromiseInterface Promise that is resolved when the context finishes.
*/
public function join();
/**
* Executes the context's main code.
*/
public function run();
}

View File

@ -11,7 +11,7 @@ use Icicle\Socket\Stream\DuplexStream;
/**
* Implements a UNIX-compatible context using forked processes.
*/
abstract class ForkContext extends Synchronized implements ContextInterface
class ForkContext extends Synchronized implements ContextInterface
{
const MSG_DONE = 1;
const MSG_ERROR = 2;
@ -21,14 +21,19 @@ abstract class ForkContext extends Synchronized implements ContextInterface
private $pid = 0;
private $isChild = false;
private $deferred;
private $function;
/**
* Creates a new fork context.
*
* @param callable $function The function to run in the context.
*/
public function __construct()
public function __construct(callable $function)
{
parent::__construct();
$this->function = $function;
$this->deferred = new Deferred(function (\Exception $exception) {
$this->stop();
});
@ -120,22 +125,7 @@ abstract class ForkContext extends Synchronized implements ContextInterface
Loop\clear();
// Execute the context runnable and send the parent context the result.
try {
$generator = $this->run();
if ($generator instanceof \Generator) {
$coroutine = new Coroutine($generator);
}
Loop\run();
fwrite($this->childSocket, chr(self::MSG_DONE));
} catch (\Exception $exception) {
fwrite($this->childSocket, chr(self::MSG_ERROR));
$serialized = serialize($exception);
$length = strlen($serialized);
fwrite($this->childSocket, pack('S', $length).$serialized);
} finally {
fclose($this->childSocket);
exit(0);
}
$this->run();
}
public function stop()
@ -166,11 +156,6 @@ abstract class ForkContext extends Synchronized implements ContextInterface
return $this->deferred->getPromise();
}
/**
* {@inheritdoc}
*/
abstract public function run();
public function __destruct()
{
parent::__destruct();
@ -181,4 +166,24 @@ abstract class ForkContext extends Synchronized implements ContextInterface
//$this->semaphore->destroy();
}
}
private function run()
{
try {
$generator = call_user_func($this->function);
if ($generator instanceof \Generator) {
$coroutine = new Coroutine($generator);
}
Loop\run();
fwrite($this->childSocket, chr(self::MSG_DONE));
} catch (\Exception $exception) {
fwrite($this->childSocket, chr(self::MSG_ERROR));
$serialized = serialize($exception);
$length = strlen($serialized);
fwrite($this->childSocket, pack('S', $length) . $serialized);
} finally {
fclose($this->childSocket);
exit(0);
}
}
}

View File

@ -1,8 +1,11 @@
<?php
namespace Icicle\Concurrent\Threading;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
/**
* A thread object that is used by ThreadContext.
* An internal thread that executes a given function concurrently.
*/
class Thread extends \Thread
{
@ -10,11 +13,31 @@ class Thread extends \Thread
const MSG_ERROR = 2;
private $socket;
private $class;
public function __construct($class)
/**
* @var ThreadContext An instance of the context local to this thread.
*/
public $context;
/**
* @var string|null Path to an autoloader to include.
*/
public $autoloaderPath;
/**
* @var callable The function to execute in the thread.
*/
private $function;
/**
* Creates a new thread object.
*
* @param callable $function The function to execute in the thread.
*/
public function __construct(callable $function)
{
$this->class = $class;
$this->function = $function;
$this->context = ThreadContext::createLocalInstance($this);
}
public function initialize($socket)
@ -24,13 +47,29 @@ class Thread extends \Thread
public function run()
{
$class = $this->class;
$instance = $class::createThreadInstance();
$instance->run();
try {
if (file_exists($this->autoloaderPath)) {
require $this->autoloaderPath;
}
$generator = call_user_func($this->function);
if ($generator instanceof \Generator) {
$coroutine = new Coroutine($generator);
}
Loop\run();
$this->sendMessage(self::MSG_DONE);
} catch (\Exception $exception) {
print $exception . PHP_EOL;
$this->sendMessage(self::MSG_ERROR);
$serialized = serialize($exception);
$length = strlen($serialized);
fwrite($this->socket, pack('S', $length) . $serialized);
} finally {
fclose($this->socket);
}
}
private function sendMessage($message)
{

View File

@ -13,7 +13,7 @@ use Icicle\Socket\Stream\DuplexStream;
* maintained both in the context that creates the thread and in the thread
* itself.
*/
abstract class ThreadContext implements ContextInterface
class ThreadContext implements ContextInterface
{
/**
* @var \Thread A thread instance.
@ -30,22 +30,39 @@ abstract class ThreadContext implements ContextInterface
*/
private $socket;
public static function createThreadInstance()
/**
* @var A reference handle to the invoker.
*/
private $invoker;
/**
* Creates an instance of the current context class for the local thread.
*
* @return self
*
* @internal
*/
final public static function createLocalInstance(Thread $thread)
{
$class = new \ReflectionClass(static::class);
return $class->newInstanceWithoutConstructor();
$instance = $class->newInstanceWithoutConstructor();
$instance->thread = $thread;
return $instance;
}
/**
* Creates a new thread context.
*
* @param callable $function The function to run in the thread.
*/
public function __construct()
public function __construct(callable $function)
{
$this->deferredJoin = new Promise\Deferred(function () {
$this->kill();
});
$this->thread = new Thread(static::class);
$this->thread = new Thread($function);
$this->thread->autoloaderPath = $this->getComposerAutoloader();
}
/**
@ -65,11 +82,8 @@ abstract class ThreadContext implements ContextInterface
throw new \Exception();
}
// When the thread is started, the event loop will be duplicated, so we
// need to start the thread before we add anything else to the event loop
// or we will cause a segmentation fault.
$this->thread->initialize($sockets[1]);
$this->thread->start(PTHREADS_INHERIT_ALL);
$this->thread->start(PTHREADS_INHERIT_INI);
$this->socket = new DuplexStream($sockets[0]);
@ -152,4 +166,25 @@ abstract class ThreadContext implements ContextInterface
return $returnValue;
}
/**
* Gets the full path to the Composer autoloader.
*
* If no Composer autoloader is being used, `null` is returned.
*
* @return \Composer\Autoload\ClassLoader|null
*/
private function getComposerAutoloader()
{
foreach (get_included_files() as $path) {
if (strpos($path, 'vendor/autoload.php') !== false) {
$source = file_get_contents($path);
if (strpos($source, '@generated by Composer') !== false) {
return $path;
}
}
}
return null;
}
}