1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Remove ContextInterface & rename forks and threads

This commit is contained in:
coderstephen 2015-08-22 16:27:44 -05:00
parent bc608e5147
commit ad52ac9cf2
8 changed files with 139 additions and 546 deletions

View File

@ -2,12 +2,12 @@
<?php <?php
require dirname(__DIR__).'/vendor/autoload.php'; require dirname(__DIR__).'/vendor/autoload.php';
use Icicle\Concurrent\Forking\ForkContext; use Icicle\Concurrent\Forking\Process;
use Icicle\Coroutine; use Icicle\Coroutine;
use Icicle\Loop; use Icicle\Loop;
Coroutine\create(function () { Coroutine\create(function () {
$context = new ForkContext(function () { $context = Process::spawn(function () {
print "Child sleeping for 4 seconds...\n"; print "Child sleeping for 4 seconds...\n";
sleep(4); sleep(4);
@ -19,8 +19,6 @@ Coroutine\create(function () {
yield 42; yield 42;
}); });
$context->start();
$timer = Loop\periodic(1, function () use ($context) { $timer = Loop\periodic(1, function () use ($context) {
static $i; static $i;
$i = $i ? ++$i : 1; $i = $i ? ++$i : 1;

View File

@ -2,7 +2,7 @@
<?php <?php
require dirname(__DIR__).'/vendor/autoload.php'; require dirname(__DIR__).'/vendor/autoload.php';
use Icicle\Concurrent\Threading\ThreadContext; use Icicle\Concurrent\Threading\Thread;
use Icicle\Coroutine; use Icicle\Coroutine;
use Icicle\Loop; use Icicle\Loop;
@ -14,7 +14,7 @@ $timer = Loop\periodic(1, function () {
Coroutine\create(function () { Coroutine\create(function () {
// Create a new child thread that does some blocking stuff. // Create a new child thread that does some blocking stuff.
$context = new ThreadContext(function () { $context = Thread::spawn(function () {
printf("\$this: %s\n", get_class($this)); printf("\$this: %s\n", get_class($this));
printf("Received the following from parent: %s\n", (yield $this->receive())); printf("Received the following from parent: %s\n", (yield $this->receive()));
@ -36,9 +36,6 @@ Coroutine\create(function () {
yield 42; yield 42;
}); });
// Run the thread and wait asynchronously for it to finish.
$context->start();
yield $context->send('Start data'); yield $context->send('Start data');
$lock = (yield $context->acquire()); $lock = (yield $context->acquire());

View File

@ -1,37 +0,0 @@
<?php
namespace Icicle\Concurrent;
/**
* Interface for all types of execution contexts.
*/
interface ContextInterface extends ChannelInterface
{
/**
* Checks if the context is running.
*
* @return bool True if the context is running, otherwise false.
*/
public function isRunning();
/**
* Starts the context execution.
*/
public function start();
/**
* Immediately kills the context.
*/
public function kill();
/**
* @coroutine
*
* Gets a promise that resolves when the context ends and joins with the
* parent context.
*
* @return \Generator
*
* @resolve mixed Resolved with the return or resolution value of the context once it has completed execution.
*/
public function join();
}

View File

@ -1,228 +0,0 @@
<?php
namespace Icicle\Concurrent\Forking;
use Icicle\Concurrent\ContextInterface;
use Icicle\Concurrent\Exception\ForkException;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ChannelInterface;
use Icicle\Concurrent\Sync\ExitFailure;
use Icicle\Concurrent\Sync\ExitStatusInterface;
use Icicle\Concurrent\Sync\ExitSuccess;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
/**
* Implements a UNIX-compatible context using forked processes.
*/
class ForkContext implements ContextInterface
{
/**
* @var \Icicle\Concurrent\Sync\Channel A channel for communicating with the child.
*/
private $channel;
/**
* @var int
*/
private $pid = 0;
/**
* @var callable
*/
private $function;
/**
* @var \Icicle\Concurrent\Forking\Synchronized
*/
private $synchronized;
public function __construct(callable $function /* , ...$args */)
{
$this->function = $function;
$this->args = array_slice(func_get_args(), 1);
$this->synchronized = new Synchronized();
}
/**
* Gets the forked process's process ID.
*
* @return int The process ID.
*/
public function getPid()
{
return $this->pid;
}
/**
* {@inheritdoc}
*/
public function isRunning()
{
return posix_getpgid($this->pid) !== false;
}
/**
* {@inheritdoc}
*/
public function start()
{
list($parent, $child) = Channel::createSocketPair();
switch ($pid = pcntl_fork()) {
case -1: // Failure
throw new ForkException('Could not fork process!');
case 0: // Child
// We will have a cloned event loop from the parent after forking. The
// child context by default is synchronous and uses the parent event
// loop, so we need to stop the clone before doing any work in case it
// is already running.
Loop\stop();
Loop\reInit();
Loop\clear();
$channel = new Channel($parent);
fclose($child);
$coroutine = new Coroutine($this->execute($channel));
$coroutine->done();
try {
Loop\run();
} catch (\Exception $exception) {
exit(-1);
}
exit(0);
default: // Parent
$this->pid = $pid;
$this->channel = new Channel($child);
fclose($parent);
}
}
/**
* @coroutine
*
* This method is run only on the child.
*
* @param \Icicle\Concurrent\Sync\ChannelInterface $channel
*
* @return \Generator
*/
private function execute(ChannelInterface $channel)
{
$executor = new ForkExecutor($this->synchronized, $channel);
try {
$function = $this->function;
if ($function instanceof \Closure) {
$function = $function->bindTo($executor, ForkExecutor::class);
}
$result = new ExitSuccess(yield call_user_func_array($function, $this->args));
} catch (\Exception $exception) {
$result = new ExitFailure($exception);
}
try {
yield $channel->send($result);
} finally {
$channel->close();
}
}
/**
* {@inheritdoc}
*/
public function lock()
{
return $this->synchronized->lock();
}
/**
* {@inheritdoc}
*/
public function unlock()
{
return $this->synchronized->unlock();
}
/**
* {@inheritdoc}
*/
public function synchronized(callable $callback)
{
return $this->synchronized->synchronized($callback);
}
/**
* {@inheritdoc}
*/
public function kill()
{
if ($this->isRunning()) {
// forcefully kill the process using SIGKILL
posix_kill($this->getPid(), SIGKILL);
if (null !== $this->channel && $this->channel->isOpen()) {
$this->channel->close();
}
}
}
/**
* {@inheritdoc}
*/
public function join()
{
try {
$response = (yield $this->channel->receive());
if (!$response instanceof ExitStatusInterface) {
throw new SynchronizationError(sprintf(
'Did not receive an exit status from fork. Instead received data of type %s',
is_object($response) ? get_class($response) : gettype($response)
));
}
yield $response->getResult();
} finally {
$this->kill();
}
}
/**
* {@inheritdoc}
*/
public function receive()
{
$data = (yield $this->channel->receive());
if ($data instanceof ExitStatusInterface) {
$data = $data->getResult();
throw new SynchronizationError(sprintf(
'Fork unexpectedly exited with result of type: %s',
is_object($data) ? get_class($data) : gettype($data)
));
}
yield $data;
}
/**
* {@inheritdoc}
*/
public function send($data)
{
if ($data instanceof ExitStatusInterface) {
throw new InvalidArgumentError('Cannot send exit status objects.');
}
yield $this->channel->send($data);
}
}

View File

@ -1,146 +1,172 @@
<?php <?php
namespace Icicle\Concurrent\Threading; namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\ChannelInterface;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\Sync\Channel; use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ChannelInterface; use Icicle\Concurrent\Sync\ExitStatusInterface;
use Icicle\Concurrent\Sync\ExitFailure; use Icicle\Concurrent\Sync\Lock;
use Icicle\Concurrent\Sync\ExitSuccess; use Icicle\Coroutine;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
/** /**
* An internal thread that executes a given function concurrently. * Implements an execution context using native multi-threading.
* *
* @internal * The thread context is not itself threaded. A local instance of the context is
* maintained both in the context that creates the thread and in the thread
* itself.
*/ */
class Thread extends \Thread class Thread implements ChannelInterface
{ {
/** /**
* @var callable The function to execute in the thread. * @var \Icicle\Concurrent\Threading\InternalThread An internal thread instance.
*/ */
private $function; private $thread;
/** /**
* @var * @var \Icicle\Concurrent\Sync\Channel A channel for communicating with the thread.
*/ */
private $args; private $channel;
/** /**
* @var resource * Spawns a new thread and runs it.
*/
private $socket;
/**
* @var bool
*/
private $lock = true;
/**
* Creates a new thread object.
* *
* @param resource $socket IPC communication socket. * @param callable $function A callable to invoke in the thread.
* @param callable $function The function to execute in the thread.
* @param mixed[] $args Arguments to pass to the function.
*/
public function __construct($socket, callable $function, array $args = [])
{
$this->function = $function;
$this->args = $args;
$this->socket = $socket;
}
/**
* Runs the thread code and the initialized function.
*/
public function run()
{
/* First thing we need to do is re-initialize the class autoloader. If
* we don't do this first, any object of a class that was loaded after
* the thread started will just be garbage data and unserializable
* values (like resources) will be lost. This happens even with
* thread-safe objects.
*/
foreach (get_declared_classes() as $className) {
if (strpos($className, 'ComposerAutoloaderInit') === 0) {
// Calling getLoader() will register the class loader for us
$className::getLoader();
break;
}
}
// Erase the old event loop inherited from the parent thread and create
// a new one.
Loop\loop(Loop\create());
// At this point, the thread environment has been prepared so begin using the thread.
$channel = new Channel($this->socket);
$coroutine = new Coroutine($this->execute($channel));
$coroutine->done();
Loop\run();
}
/**
* Attempts to obtain the lock. Returns true if the lock was obtained.
* *
* @return bool * @return Thread The thread object that was spawned.
*/ */
public function tsl() public static function spawn(callable $function /* , ...$args */)
{ {
if (!$this->lock) { $thread = new static($function);
return false; $thread->start();
} return $thread;
$this->lock();
try {
if ($this->lock) {
$this->lock = false;
return true;
}
return false;
} finally {
$this->unlock();
}
} }
/** /**
* Releases the lock. * Creates a new thread context from a thread.
*
* @param callable $function
*/ */
public function release() public function __construct(callable $function /* , ...$args */)
{ {
$this->lock = true; $args = array_slice(func_get_args(), 1);
list($channel, $socket) = Channel::createSocketPair();
$this->channel = new Channel($channel);
$this->thread = new InternalThread($socket, $function, $args);
}
/**
* Checks if the context is running.
*
* @return bool True if the context is running, otherwise false.
*/
public function isRunning()
{
return $this->thread->isRunning();
}
/**
* Starts the context execution.
*/
public function start()
{
if ($this->isRunning()) {
throw new SynchronizationError('The thread has already been started.');
}
$this->thread->start(PTHREADS_INHERIT_ALL);
}
/**
* Immediately kills the context.
*/
public function kill()
{
$this->channel->close();
$this->thread->kill();
} }
/** /**
* @coroutine * @coroutine
* *
* @param \Icicle\Concurrent\Sync\ChannelInterface $channel * Gets a promise that resolves when the context ends and joins with the
* parent context.
* *
* @return \Generator * @return \Generator
* *
* @resolve int * @resolve mixed Resolved with the return or resolution value of the context once it has completed execution.
*/ */
private function execute(ChannelInterface $channel) public function join()
{ {
$executor = new ThreadExecutor($this, $channel); if (!$this->isRunning()) {
throw new SynchronizationError('The thread has not been started or has already finished.');
}
try { try {
$function = $this->function; $response = (yield $this->channel->receive());
if ($function instanceof \Closure) {
$function = $function->bindTo($executor, ThreadExecutor::class); if (!$response instanceof ExitStatusInterface) {
throw new SynchronizationError('Did not receive an exit status from thread.');
} }
$result = new ExitSuccess(yield call_user_func_array($function, $this->args)); yield $response->getResult();
} catch (\Exception $exception) {
$result = new ExitFailure($exception);
}
try {
yield $channel->send($result);
} finally { } finally {
$channel->close(); $this->channel->close();
$this->thread->join();
} }
} }
/**
* {@inheritdoc}
*/
public function receive()
{
if (!$this->isRunning()) {
throw new SynchronizationError('The thread has not been started or has already finished.');
}
$data = (yield $this->channel->receive());
if ($data instanceof ExitStatusInterface) {
$data = $data->getResult();
throw new SynchronizationError(sprintf(
'Thread unexpectedly exited with result of type: %s',
is_object($data) ? get_class($data) : gettype($data)
));
}
yield $data;
}
/**
* {@inheritdoc}
*/
public function send($data)
{
if (!$this->isRunning()) {
throw new SynchronizationError('The thread has not been started or has already finished.');
}
if ($data instanceof ExitStatusInterface) {
throw new InvalidArgumentError('Cannot send exit status objects.');
}
return $this->channel->send($data);
}
/**
* {@inheritdoc}
*/
public function acquire()
{
while (!$this->thread->tsl()) {
yield Coroutine\sleep(0.01);
}
yield new Lock(function () {
$this->thread->release();
});
}
} }

View File

@ -1,163 +0,0 @@
<?php
namespace Icicle\Concurrent\Threading;
use Icicle\Concurrent\ContextInterface;
use Icicle\Concurrent\Exception\InvalidArgumentError;
use Icicle\Concurrent\Exception\SynchronizationError;
use Icicle\Concurrent\Sync\Channel;
use Icicle\Concurrent\Sync\ExitStatusInterface;
use Icicle\Concurrent\Sync\Lock;
use Icicle\Coroutine;
/**
* Implements an execution context using native multi-threading.
*
* The thread context is not itself threaded. A local instance of the context is
* maintained both in the context that creates the thread and in the thread
* itself.
*/
class ThreadContext implements ContextInterface
{
/**
* @var \Icicle\Concurrent\Threading\Thread A thread instance.
*/
private $thread;
/**
* @var \Icicle\Concurrent\Sync\Channel A channel for communicating with the thread.
*/
private $channel;
/**
* Spawns a new thread and runs it.
*
* @param callable $function A callable to invoke in the thread.
*
* @return ThreadContext The thread object that was spawned.
*/
public static function spawn(callable $function /* , ...$args */)
{
$thread = new static($function);
$thread->start();
return $thread;
}
/**
* Creates a new thread context from a thread.
*
* @param callable $function
*/
public function __construct(callable $function /* , ...$args */)
{
$args = array_slice(func_get_args(), 1);
list($channel, $socket) = Channel::createSocketPair();
$this->channel = new Channel($channel);
$this->thread = new Thread($socket, $function, $args);
}
/**
* {@inheritdoc}
*/
public function isRunning()
{
return $this->thread->isRunning();
}
/**
* {@inheritdoc}
*/
public function start()
{
if ($this->isRunning()) {
throw new SynchronizationError('The thread has already been started.');
}
$this->thread->start(PTHREADS_INHERIT_ALL);
}
/**
* {@inheritdoc}
*/
public function kill()
{
$this->channel->close();
$this->thread->kill();
}
/**
* {@inheritdoc}
*/
public function join()
{
if (!$this->isRunning()) {
throw new SynchronizationError('The thread has not been started or has already finished.');
}
try {
$response = (yield $this->channel->receive());
if (!$response instanceof ExitStatusInterface) {
throw new SynchronizationError('Did not receive an exit status from thread.');
}
yield $response->getResult();
} finally {
$this->channel->close();
$this->thread->join();
}
}
/**
* {@inheritdoc}
*/
public function receive()
{
if (!$this->isRunning()) {
throw new SynchronizationError('The thread has not been started or has already finished.');
}
$data = (yield $this->channel->receive());
if ($data instanceof ExitStatusInterface) {
$data = $data->getResult();
throw new SynchronizationError(sprintf(
'Thread unexpectedly exited with result of type: %s',
is_object($data) ? get_class($data) : gettype($data)
));
}
yield $data;
}
/**
* {@inheritdoc}
*/
public function send($data)
{
if (!$this->isRunning()) {
throw new SynchronizationError('The thread has not been started or has already finished.');
}
if ($data instanceof ExitStatusInterface) {
throw new InvalidArgumentError('Cannot send exit status objects.');
}
return $this->channel->send($data);
}
/**
* {@inheritdoc}
*/
public function acquire()
{
while (!$this->thread->tsl()) {
yield Coroutine\sleep(0.01);
}
yield new Lock(function () {
$this->thread->release();
});
}
}

View File

@ -11,7 +11,7 @@ use Icicle\Coroutine;
class ThreadExecutor implements ExecutorInterface class ThreadExecutor implements ExecutorInterface
{ {
/** /**
* @var \Icicle\Concurrent\Threading\Thread * @var \Icicle\Concurrent\Threading\InternalThread
*/ */
private $thread; private $thread;
@ -21,10 +21,10 @@ class ThreadExecutor implements ExecutorInterface
private $channel; private $channel;
/** /**
* @param \Icicle\Concurrent\Threading\Thread * @param \Icicle\Concurrent\Threading\InternalThread
* @param \Icicle\Concurrent\Sync\ChannelInterface $channel * @param \Icicle\Concurrent\Sync\ChannelInterface $channel
*/ */
public function __construct(Thread $thread, ChannelInterface $channel) public function __construct(InternalThread $thread, ChannelInterface $channel)
{ {
$this->thread = $thread; $this->thread = $thread;
$this->channel = $channel; $this->channel = $channel;
@ -63,4 +63,4 @@ class ThreadExecutor implements ExecutorInterface
$this->thread->release(); $this->thread->release();
}); });
} }
} }

View File

@ -1,7 +1,7 @@
<?php <?php
namespace Icicle\Concurrent\Worker; namespace Icicle\Concurrent\Worker;
use Icicle\Concurrent\Threading\ThreadContext; use Icicle\Concurrent\Threading\Thread;
use Icicle\Coroutine\Coroutine; use Icicle\Coroutine\Coroutine;
/** /**
@ -19,7 +19,7 @@ class WorkerThread implements WorkerInterface
*/ */
public function __construct() public function __construct()
{ {
$this->thread = new ThreadContext(function () { $this->thread = new Thread(function () {
while (true) { while (true) {
print "Waiting for task...\n"; print "Waiting for task...\n";
$task = (yield $this->receive()); $task = (yield $this->receive());