1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Add executor contexts, bind to executors instead of channel

This commit is contained in:
Aaron Piotrowski 2015-08-07 15:30:14 -05:00
parent 50444e41e3
commit 1b77f37693
11 changed files with 263 additions and 46 deletions

View File

@ -11,20 +11,24 @@ Coroutine\create(function () {
print "Child sleeping for 4 seconds...\n";
sleep(4);
yield $this->send('Data sent from child.');
print "Child sleeping for 2 seconds...\n";
sleep(2);
return 42;
yield 42;
});
$context->start();
$timer = Loop\periodic(1, function () use ($context) {
static $i;
$i = $i + 1 ?: 1;
$i = $i ? ++$i : 1;
print "Demonstrating how alive the parent is for the {$i}th time.\n";
});
try {
printf("Received the following from child: %s\n", (yield $context->receive()));
printf("Child ended with value %d!\n", (yield $context->join()));
} catch (Exception $e) {
print "Error from child!\n";

View File

@ -7,22 +7,32 @@ use Icicle\Coroutine;
use Icicle\Loop;
$timer = Loop\periodic(1, function () {
print "Demonstrating how alive the parent is.\n";
static $i;
$i = $i ? ++$i : 1;
print "Demonstrating how alive the parent is for the {$i}th time.\n";
});
Coroutine\create(function () {
// Create a periodic message in the main thread.
// Create a new child thread that does some blocking stuff.
$test = new ThreadContext(function () {
print "Sleeping for 5 seconds...\n";
sleep(5);
return 42;
$context = new ThreadContext(function () {
print "Sleeping for 3 seconds...\n";
sleep(3);
yield $this->send('Data sent from child.');
print "Sleeping for 2 seconds...\n";
sleep(2);
yield 42;
});
// Run the thread and wait asynchronously for it to finish.
$test->start();
printf("Thread ended with value %d!\n", (yield $test->join()));
})->done([$timer, 'stop']);
$context->start();
printf("Received the following from child: %s\n", (yield $context->receive()));
printf("Thread ended with value %d!\n", (yield $context->join()));
})->cleanup([$timer, 'stop']);
Loop\run();

24
src/ChannelInterface.php Normal file
View File

@ -0,0 +1,24 @@
<?php
namespace Icicle\Concurrent;
/**
* Interface for execution context within a thread or fork.
*/
interface ChannelInterface
{
/**
* @return \Generator
*
* @resolve mixed
*/
public function receive();
/**
* @param mixed $data
*
* @return \Generator
*
* @resolve int
*/
public function send($data);
}

View File

@ -4,7 +4,7 @@ namespace Icicle\Concurrent;
/**
* Interface for all types of execution contexts.
*/
interface ContextInterface extends SynchronizableInterface
interface ContextInterface extends SynchronizableInterface, ChannelInterface
{
/**
* Checks if the context is running.
@ -23,22 +23,6 @@ interface ContextInterface extends SynchronizableInterface
*/
public function kill();
/**
* @return \Generator
*
* @resolve mixed
*/
public function receive();
/**
* @param mixed $data
*
* @return \Generator
*
* @resolve int
*/
public function send($data);
/**
* Gets a promise that resolves when the context ends and joins with the
* parent context.

13
src/ExecutorInterface.php Normal file
View File

@ -0,0 +1,13 @@
<?php
namespace Icicle\Concurrent;
/**
* Interface for execution context within a thread or fork.
*/
interface ExecutorInterface extends SynchronizableInterface, ChannelInterface
{
/**
* @return \Icicle\Promise\PromiseInterface
*/
public function close();
}

View File

@ -4,6 +4,7 @@ namespace Icicle\Concurrent\Forking;
use Icicle\Concurrent\ContextInterface;
use Icicle\Concurrent\Exception\ForkException;
use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\ExecutorInterface;
use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ExitFailure;
use Icicle\Concurrent\Sync\ExitInterface;
@ -15,7 +16,7 @@ use Icicle\Promise;
/**
* Implements a UNIX-compatible context using forked processes.
*/
class ForkContext extends Synchronized implements ContextInterface
class ForkContext implements ContextInterface
{
/**
* @var \Icicle\Concurrent\Sync\Channel A channel for communicating with the child.
@ -32,12 +33,17 @@ class ForkContext extends Synchronized implements ContextInterface
*/
private $function;
/**
* @var \Icicle\Concurrent\Forking\Synchronized
*/
private $synchronized;
public function __construct(callable $function /* , ...$args */)
{
parent::__construct();
$this->function = $function;
$this->args = array_slice(func_get_args(), 1);
$this->synchronized = new Synchronized();
}
/**
@ -81,9 +87,11 @@ class ForkContext extends Synchronized implements ContextInterface
$channel = new Channel($parent);
fclose($child);
$executor = new ForkExecutor($this->synchronized, $channel);
// Execute the context runnable and send the parent context the result.
try {
Promise\wait(new Coroutine($this->execute($channel)));
Promise\wait(new Coroutine($this->execute($executor)));
} catch (\Exception $exception) {
exit(-1);
}
@ -98,16 +106,16 @@ class ForkContext extends Synchronized implements ContextInterface
}
/**
* @param Channel $channel
* @param \Icicle\Concurrent\ExecutorInterface
*
* @return \Generator
*/
private function execute(Channel $channel)
private function execute(ExecutorInterface $executor)
{
try {
$function = $this->function;
if ($function instanceof \Closure) {
$function = $function->bindTo($channel, Channel::class);
$function = $function->bindTo($executor, ForkExecutor::class);
}
$result = new ExitSuccess(yield call_user_func_array($function, $this->args));
@ -115,9 +123,33 @@ class ForkContext extends Synchronized implements ContextInterface
$result = new ExitFailure($exception);
}
yield $channel->send($result);
yield $executor->send($result);
$channel->close();
yield $executor->close();
}
/**
* {@inheritdoc}
*/
public function lock()
{
return $this->synchronized->lock();
}
/**
* {@inheritdoc}
*/
public function unlock()
{
return $this->synchronized->unlock();
}
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback)
{
return $this->synchronized->synchronized($callback);
}
/**

View File

@ -0,0 +1,72 @@
<?php
namespace Icicle\Concurrent\Forking;
use Icicle\Concurrent\ExecutorInterface;
use Icicle\Concurrent\Sync\Channel;
class ForkExecutor implements ExecutorInterface
{
/**
* @var \Icicle\Concurrent\Forking\Synchronized
*/
private $synchronized;
/**
* @var \Icicle\Concurrent\Sync\Channel
*/
private $channel;
public function __construct(Synchronized $synchronized, Channel $channel)
{
$this->synchronized = $synchronized;
$this->channel = $channel;
}
/**
* {@inheritdoc}
*/
public function receive()
{
return $this->channel->receive();
}
/**
* {@inheritdoc}
*/
public function send($data)
{
return $this->channel->send($data);
}
/**
* {@inheritdoc}
*/
public function close()
{
return $this->channel->close();
}
/**
* {@inheritdoc}
*/
public function lock()
{
$this->synchronized->lock();
}
/**
* {@inheritdoc}
*/
public function unlock()
{
$this->synchronized->unlock();
}
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback)
{
return $this->synchronized->synchronized($callback);
}
}

View File

@ -11,7 +11,7 @@ use Icicle\Concurrent\Sync\AsyncSemaphore;
* When used with forking, the object must be created prior to forking for both
* processes to access the synchronized object.
*/
abstract class Synchronized extends SharedObject implements SynchronizableInterface
class Synchronized extends SharedObject implements SynchronizableInterface
{
/**
* @var AsyncSemaphore A semaphore used for locking the object data.

View File

@ -1,6 +1,7 @@
<?php
namespace Icicle\Concurrent\Sync;
use Icicle\Concurrent\ChannelInterface;
use Icicle\Concurrent\Exception\ChannelException;
use Icicle\Socket\Stream\DuplexStream;
@ -13,7 +14,7 @@ use Icicle\Socket\Stream\DuplexStream;
* channel object can be safely transferred between threads up until the point
* that the channel is used.
*/
class Channel
class Channel implements ChannelInterface
{
const MESSAGE_CLOSE = 1;
const MESSAGE_DATA = 2;

View File

@ -1,6 +1,7 @@
<?php
namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\ExecutorInterface;
use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ExitFailure;
use Icicle\Concurrent\Sync\ExitSuccess;
@ -104,23 +105,23 @@ class Thread extends \Thread
// At this point, the thread environment has been prepared, and the
// parent has finished injecting values into our memory, so begin using
// the channel.
$channel = new Channel($this->socket);
// the thread.
$executor = new ThreadExecutor($this, new Channel($this->socket));
Promise\wait(new Coroutine($this->execute($channel)));
Promise\wait(new Coroutine($this->execute($executor)));
}
/**
* @param Channel $channel
* @param \Icicle\Concurrent\ExecutorInterface
*
* @return \Generator
*/
private function execute(Channel $channel)
private function execute(ExecutorInterface $executor)
{
try {
$function = $this->function;
if ($function instanceof \Closure) {
$function = $function->bindTo($channel, Channel::class);
$function = $function->bindTo($executor, ThreadExecutor::class);
}
$result = new ExitSuccess(yield call_user_func_array($function, $this->args));
@ -128,8 +129,8 @@ class Thread extends \Thread
$result = new ExitFailure($exception);
}
yield $channel->send($result);
yield $executor->send($result);
$channel->close();
yield $executor->close();
}
}

View File

@ -0,0 +1,76 @@
<?php
namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\ExecutorInterface;
use Icicle\Concurrent\Sync\Channel;
class ThreadExecutor implements ExecutorInterface
{
/**
* @var \Icicle\Concurrent\Threading\Thread
*/
private $thread;
/**
* @var \Icicle\Concurrent\ChannelInterface
*/
private $channel;
/**
* @param \Icicle\Concurrent\Threading\Thread
* @param \Icicle\Concurrent\Sync\Channel $channel
*/
public function __construct(Thread $thread, Channel $channel)
{
$this->thread = $thread;
$this->channel = $channel;
}
/**
* {@inheritdoc}
*/
public function receive()
{
return $this->channel->receive();
}
/**
* {@inheritdoc}
*/
public function send($data)
{
return $this->channel->send($data);
}
/**
* {@inheritdoc}
*/
public function close()
{
return $this->channel->close();
}
/**
* {@inheritdoc}
*/
public function lock()
{
return $this->thread->lock();
}
/**
* {@inheritdoc}
*/
public function unlock()
{
return $this->thread->unlock();
}
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback)
{
return $this->thread->synchronized($callback);
}
}