From ad52ac9cf2562daf80d15d7608236b72ef6520a5 Mon Sep 17 00:00:00 2001 From: coderstephen Date: Sat, 22 Aug 2015 16:27:44 -0500 Subject: [PATCH] Remove ContextInterface & rename forks and threads --- examples/fork.php | 6 +- examples/thread.php | 7 +- src/ContextInterface.php | 37 ----- src/Forking/ForkContext.php | 228 ------------------------------ src/Threading/Thread.php | 232 +++++++++++++++++-------------- src/Threading/ThreadContext.php | 163 ---------------------- src/Threading/ThreadExecutor.php | 8 +- src/Worker/WorkerThread.php | 4 +- 8 files changed, 139 insertions(+), 546 deletions(-) delete mode 100644 src/ContextInterface.php delete mode 100644 src/Forking/ForkContext.php delete mode 100644 src/Threading/ThreadContext.php diff --git a/examples/fork.php b/examples/fork.php index 5bcc643..7419a5f 100755 --- a/examples/fork.php +++ b/examples/fork.php @@ -2,12 +2,12 @@ start(); - $timer = Loop\periodic(1, function () use ($context) { static $i; $i = $i ? ++$i : 1; diff --git a/examples/thread.php b/examples/thread.php index a4af471..0175d01 100755 --- a/examples/thread.php +++ b/examples/thread.php @@ -2,7 +2,7 @@ receive())); @@ -36,9 +36,6 @@ Coroutine\create(function () { yield 42; }); - // Run the thread and wait asynchronously for it to finish. - $context->start(); - yield $context->send('Start data'); $lock = (yield $context->acquire()); diff --git a/src/ContextInterface.php b/src/ContextInterface.php deleted file mode 100644 index c7fb0c3..0000000 --- a/src/ContextInterface.php +++ /dev/null @@ -1,37 +0,0 @@ -function = $function; - $this->args = array_slice(func_get_args(), 1); - - $this->synchronized = new Synchronized(); - } - - /** - * Gets the forked process's process ID. - * - * @return int The process ID. - */ - public function getPid() - { - return $this->pid; - } - - /** - * {@inheritdoc} - */ - public function isRunning() - { - return posix_getpgid($this->pid) !== false; - } - - /** - * {@inheritdoc} - */ - public function start() - { - list($parent, $child) = Channel::createSocketPair(); - - switch ($pid = pcntl_fork()) { - case -1: // Failure - throw new ForkException('Could not fork process!'); - - case 0: // Child - // We will have a cloned event loop from the parent after forking. The - // child context by default is synchronous and uses the parent event - // loop, so we need to stop the clone before doing any work in case it - // is already running. - Loop\stop(); - Loop\reInit(); - Loop\clear(); - - $channel = new Channel($parent); - fclose($child); - - $coroutine = new Coroutine($this->execute($channel)); - $coroutine->done(); - - try { - Loop\run(); - } catch (\Exception $exception) { - exit(-1); - } - - exit(0); - - default: // Parent - $this->pid = $pid; - $this->channel = new Channel($child); - fclose($parent); - } - } - - /** - * @coroutine - * - * This method is run only on the child. - * - * @param \Icicle\Concurrent\Sync\ChannelInterface $channel - * - * @return \Generator - */ - private function execute(ChannelInterface $channel) - { - $executor = new ForkExecutor($this->synchronized, $channel); - - try { - $function = $this->function; - if ($function instanceof \Closure) { - $function = $function->bindTo($executor, ForkExecutor::class); - } - - $result = new ExitSuccess(yield call_user_func_array($function, $this->args)); - } catch (\Exception $exception) { - $result = new ExitFailure($exception); - } - - try { - yield $channel->send($result); - } finally { - $channel->close(); - } - } - - /** - * {@inheritdoc} - */ - public function lock() - { - return $this->synchronized->lock(); - } - - /** - * {@inheritdoc} - */ - public function unlock() - { - return $this->synchronized->unlock(); - } - - /** - * {@inheritdoc} - */ - public function synchronized(callable $callback) - { - return $this->synchronized->synchronized($callback); - } - - /** - * {@inheritdoc} - */ - public function kill() - { - if ($this->isRunning()) { - // forcefully kill the process using SIGKILL - posix_kill($this->getPid(), SIGKILL); - - if (null !== $this->channel && $this->channel->isOpen()) { - $this->channel->close(); - } - } - } - - /** - * {@inheritdoc} - */ - public function join() - { - try { - $response = (yield $this->channel->receive()); - - if (!$response instanceof ExitStatusInterface) { - throw new SynchronizationError(sprintf( - 'Did not receive an exit status from fork. Instead received data of type %s', - is_object($response) ? get_class($response) : gettype($response) - )); - } - - yield $response->getResult(); - } finally { - $this->kill(); - } - } - - /** - * {@inheritdoc} - */ - public function receive() - { - $data = (yield $this->channel->receive()); - - if ($data instanceof ExitStatusInterface) { - $data = $data->getResult(); - throw new SynchronizationError(sprintf( - 'Fork unexpectedly exited with result of type: %s', - is_object($data) ? get_class($data) : gettype($data) - )); - } - - yield $data; - } - - /** - * {@inheritdoc} - */ - public function send($data) - { - if ($data instanceof ExitStatusInterface) { - throw new InvalidArgumentError('Cannot send exit status objects.'); - } - - yield $this->channel->send($data); - } -} diff --git a/src/Threading/Thread.php b/src/Threading/Thread.php index 37e4dfe..e0215a9 100644 --- a/src/Threading/Thread.php +++ b/src/Threading/Thread.php @@ -1,146 +1,172 @@ function = $function; - $this->args = $args; - $this->socket = $socket; - } - - /** - * Runs the thread code and the initialized function. - */ - public function run() - { - /* First thing we need to do is re-initialize the class autoloader. If - * we don't do this first, any object of a class that was loaded after - * the thread started will just be garbage data and unserializable - * values (like resources) will be lost. This happens even with - * thread-safe objects. - */ - foreach (get_declared_classes() as $className) { - if (strpos($className, 'ComposerAutoloaderInit') === 0) { - // Calling getLoader() will register the class loader for us - $className::getLoader(); - break; - } - } - - // Erase the old event loop inherited from the parent thread and create - // a new one. - Loop\loop(Loop\create()); - - // At this point, the thread environment has been prepared so begin using the thread. - $channel = new Channel($this->socket); - - $coroutine = new Coroutine($this->execute($channel)); - $coroutine->done(); - - Loop\run(); - } - - /** - * Attempts to obtain the lock. Returns true if the lock was obtained. + * @param callable $function A callable to invoke in the thread. * - * @return bool + * @return Thread The thread object that was spawned. */ - public function tsl() + public static function spawn(callable $function /* , ...$args */) { - if (!$this->lock) { - return false; - } - - $this->lock(); - - try { - if ($this->lock) { - $this->lock = false; - return true; - } - return false; - } finally { - $this->unlock(); - } + $thread = new static($function); + $thread->start(); + return $thread; } /** - * Releases the lock. + * Creates a new thread context from a thread. + * + * @param callable $function */ - public function release() + public function __construct(callable $function /* , ...$args */) { - $this->lock = true; + $args = array_slice(func_get_args(), 1); + + list($channel, $socket) = Channel::createSocketPair(); + + $this->channel = new Channel($channel); + $this->thread = new InternalThread($socket, $function, $args); + } + + /** + * Checks if the context is running. + * + * @return bool True if the context is running, otherwise false. + */ + public function isRunning() + { + return $this->thread->isRunning(); + } + + /** + * Starts the context execution. + */ + public function start() + { + if ($this->isRunning()) { + throw new SynchronizationError('The thread has already been started.'); + } + + $this->thread->start(PTHREADS_INHERIT_ALL); + } + + /** + * Immediately kills the context. + */ + public function kill() + { + $this->channel->close(); + $this->thread->kill(); } /** * @coroutine * - * @param \Icicle\Concurrent\Sync\ChannelInterface $channel + * Gets a promise that resolves when the context ends and joins with the + * parent context. * * @return \Generator * - * @resolve int + * @resolve mixed Resolved with the return or resolution value of the context once it has completed execution. */ - private function execute(ChannelInterface $channel) + public function join() { - $executor = new ThreadExecutor($this, $channel); + if (!$this->isRunning()) { + throw new SynchronizationError('The thread has not been started or has already finished.'); + } try { - $function = $this->function; - if ($function instanceof \Closure) { - $function = $function->bindTo($executor, ThreadExecutor::class); + $response = (yield $this->channel->receive()); + + if (!$response instanceof ExitStatusInterface) { + throw new SynchronizationError('Did not receive an exit status from thread.'); } - $result = new ExitSuccess(yield call_user_func_array($function, $this->args)); - } catch (\Exception $exception) { - $result = new ExitFailure($exception); - } - - try { - yield $channel->send($result); + yield $response->getResult(); } finally { - $channel->close(); + $this->channel->close(); + $this->thread->join(); } } + + /** + * {@inheritdoc} + */ + public function receive() + { + if (!$this->isRunning()) { + throw new SynchronizationError('The thread has not been started or has already finished.'); + } + + $data = (yield $this->channel->receive()); + + if ($data instanceof ExitStatusInterface) { + $data = $data->getResult(); + throw new SynchronizationError(sprintf( + 'Thread unexpectedly exited with result of type: %s', + is_object($data) ? get_class($data) : gettype($data) + )); + } + + yield $data; + } + + /** + * {@inheritdoc} + */ + public function send($data) + { + if (!$this->isRunning()) { + throw new SynchronizationError('The thread has not been started or has already finished.'); + } + + if ($data instanceof ExitStatusInterface) { + throw new InvalidArgumentError('Cannot send exit status objects.'); + } + + return $this->channel->send($data); + } + + /** + * {@inheritdoc} + */ + public function acquire() + { + while (!$this->thread->tsl()) { + yield Coroutine\sleep(0.01); + } + + yield new Lock(function () { + $this->thread->release(); + }); + } } diff --git a/src/Threading/ThreadContext.php b/src/Threading/ThreadContext.php deleted file mode 100644 index ff938d2..0000000 --- a/src/Threading/ThreadContext.php +++ /dev/null @@ -1,163 +0,0 @@ -start(); - return $thread; - } - - /** - * Creates a new thread context from a thread. - * - * @param callable $function - */ - public function __construct(callable $function /* , ...$args */) - { - $args = array_slice(func_get_args(), 1); - - list($channel, $socket) = Channel::createSocketPair(); - - $this->channel = new Channel($channel); - $this->thread = new Thread($socket, $function, $args); - } - - /** - * {@inheritdoc} - */ - public function isRunning() - { - return $this->thread->isRunning(); - } - - /** - * {@inheritdoc} - */ - public function start() - { - if ($this->isRunning()) { - throw new SynchronizationError('The thread has already been started.'); - } - - $this->thread->start(PTHREADS_INHERIT_ALL); - } - - /** - * {@inheritdoc} - */ - public function kill() - { - $this->channel->close(); - $this->thread->kill(); - } - - /** - * {@inheritdoc} - */ - public function join() - { - if (!$this->isRunning()) { - throw new SynchronizationError('The thread has not been started or has already finished.'); - } - - try { - $response = (yield $this->channel->receive()); - - if (!$response instanceof ExitStatusInterface) { - throw new SynchronizationError('Did not receive an exit status from thread.'); - } - - yield $response->getResult(); - } finally { - $this->channel->close(); - $this->thread->join(); - } - } - - /** - * {@inheritdoc} - */ - public function receive() - { - if (!$this->isRunning()) { - throw new SynchronizationError('The thread has not been started or has already finished.'); - } - - $data = (yield $this->channel->receive()); - - if ($data instanceof ExitStatusInterface) { - $data = $data->getResult(); - throw new SynchronizationError(sprintf( - 'Thread unexpectedly exited with result of type: %s', - is_object($data) ? get_class($data) : gettype($data) - )); - } - - yield $data; - } - - /** - * {@inheritdoc} - */ - public function send($data) - { - if (!$this->isRunning()) { - throw new SynchronizationError('The thread has not been started or has already finished.'); - } - - if ($data instanceof ExitStatusInterface) { - throw new InvalidArgumentError('Cannot send exit status objects.'); - } - - return $this->channel->send($data); - } - - /** - * {@inheritdoc} - */ - public function acquire() - { - while (!$this->thread->tsl()) { - yield Coroutine\sleep(0.01); - } - - yield new Lock(function () { - $this->thread->release(); - }); - } -} diff --git a/src/Threading/ThreadExecutor.php b/src/Threading/ThreadExecutor.php index 2871029..632be39 100644 --- a/src/Threading/ThreadExecutor.php +++ b/src/Threading/ThreadExecutor.php @@ -11,7 +11,7 @@ use Icicle\Coroutine; class ThreadExecutor implements ExecutorInterface { /** - * @var \Icicle\Concurrent\Threading\Thread + * @var \Icicle\Concurrent\Threading\InternalThread */ private $thread; @@ -21,10 +21,10 @@ class ThreadExecutor implements ExecutorInterface private $channel; /** - * @param \Icicle\Concurrent\Threading\Thread + * @param \Icicle\Concurrent\Threading\InternalThread * @param \Icicle\Concurrent\Sync\ChannelInterface $channel */ - public function __construct(Thread $thread, ChannelInterface $channel) + public function __construct(InternalThread $thread, ChannelInterface $channel) { $this->thread = $thread; $this->channel = $channel; @@ -63,4 +63,4 @@ class ThreadExecutor implements ExecutorInterface $this->thread->release(); }); } -} \ No newline at end of file +} diff --git a/src/Worker/WorkerThread.php b/src/Worker/WorkerThread.php index 3698e08..c9437ae 100644 --- a/src/Worker/WorkerThread.php +++ b/src/Worker/WorkerThread.php @@ -1,7 +1,7 @@ thread = new ThreadContext(function () { + $this->thread = new Thread(function () { while (true) { print "Waiting for task...\n"; $task = (yield $this->receive());