mirror of
https://github.com/danog/parallel.git
synced 2024-11-26 12:24:40 +01:00
Add support for krakjoe/parallel
This commit is contained in:
parent
398989d41d
commit
ecaf0a854b
@ -26,6 +26,10 @@ install:
|
||||
- if [ "$TRAVIS_PHP_VERSION" != "7.0" ] && [ "$TRAVIS_PHP_VERSION" != "7.1" ]; then
|
||||
travis/install-pthreads.sh;
|
||||
fi
|
||||
# parallel is only supported on PHP 7.1+
|
||||
- if [ "$TRAVIS_PHP_VERSION" != "7.0" ]; then
|
||||
travis/install-parallel.sh;
|
||||
fi
|
||||
- wget https://github.com/php-coveralls/php-coveralls/releases/download/v1.0.2/coveralls.phar
|
||||
- chmod +x coveralls.phar
|
||||
|
||||
|
@ -32,14 +32,12 @@
|
||||
"amphp/phpunit-util": "^1",
|
||||
"amphp/php-cs-fixer-config": "dev-master"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-pthreads": "Required for thread contexts"
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"Amp\\Parallel\\": "lib"
|
||||
},
|
||||
"files": [
|
||||
"lib/Context/Internal/functions.php",
|
||||
"lib/Worker/functions.php"
|
||||
]
|
||||
},
|
||||
|
32
examples/parallel-extension.php
Executable file
32
examples/parallel-extension.php
Executable file
@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env php
|
||||
<?php
|
||||
require \dirname(__DIR__).'/vendor/autoload.php';
|
||||
|
||||
use Amp\Delayed;
|
||||
use Amp\Loop;
|
||||
use Amp\Parallel\Context\Parallel;
|
||||
|
||||
Loop::run(function () {
|
||||
$timer = Loop::repeat(1000, function () {
|
||||
static $i;
|
||||
$i = $i ? ++$i : 1;
|
||||
print "Demonstrating how alive the parent is for the {$i}th time.\n";
|
||||
});
|
||||
|
||||
try {
|
||||
// Create a new child thread that does some blocking stuff.
|
||||
$context = yield Parallel::run(__DIR__ . "/blocking-process.php");
|
||||
|
||||
\assert($context instanceof Parallel);
|
||||
|
||||
print "Waiting 2 seconds to send start data...\n";
|
||||
yield new Delayed(2000);
|
||||
|
||||
yield $context->send("Start data"); // Data sent to child process, received on line 9 of blocking-process.php
|
||||
|
||||
\printf("Received the following from child: %s\n", yield $context->receive()); // Sent on line 14 of blocking-process.php
|
||||
\printf("Process ended with value %d!\n", yield $context->join());
|
||||
} finally {
|
||||
Loop::cancel($timer);
|
||||
}
|
||||
});
|
27
lib/Context/Internal/functions.php
Normal file
27
lib/Context/Internal/functions.php
Normal file
@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Context\Internal;
|
||||
|
||||
use Amp\Parallel\Sync\Channel;
|
||||
use Amp\Parallel\Sync\ExitFailure;
|
||||
use Amp\Parallel\Sync\ExitResult;
|
||||
use Amp\Parallel\Sync\SerializationException;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
function loadCallable(string $path)
|
||||
{
|
||||
return require $path;
|
||||
}
|
||||
|
||||
function sendResult(Channel $channel, ExitResult $result): Promise
|
||||
{
|
||||
return call(function () use ($channel, $result) {
|
||||
try {
|
||||
yield $channel->send($result);
|
||||
} catch (SerializationException $exception) {
|
||||
// Serializing the result failed. Send the reason why.
|
||||
yield $channel->send(new ExitFailure($exception));
|
||||
}
|
||||
});
|
||||
}
|
336
lib/Context/Parallel.php
Normal file
336
lib/Context/Parallel.php
Normal file
@ -0,0 +1,336 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Context;
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Parallel\Sync\ChannelException;
|
||||
use Amp\Parallel\Sync\ChannelledSocket;
|
||||
use Amp\Parallel\Sync\ExitFailure;
|
||||
use Amp\Parallel\Sync\ExitResult;
|
||||
use Amp\Parallel\Sync\ExitSuccess;
|
||||
use Amp\Parallel\Sync\SynchronizationError;
|
||||
use Amp\Promise;
|
||||
use parallel\Exception as ParallelException;
|
||||
use parallel\Runtime;
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* Implements an execution context using native multi-threading.
|
||||
*
|
||||
* The thread context is not itself threaded. A local instance of the context is
|
||||
* maintained both in the context that creates the thread and in the thread
|
||||
* itself.
|
||||
*/
|
||||
final class Parallel implements Context
|
||||
{
|
||||
const EXIT_CHECK_FREQUENCY = 250;
|
||||
const KEY_LENGTH = 32;
|
||||
|
||||
/** @var string|null */
|
||||
private static $autoloadPath;
|
||||
|
||||
/** @var Internal\ProcessHub */
|
||||
private $hub;
|
||||
|
||||
/** @var Runtime */
|
||||
private $runtime;
|
||||
|
||||
/** @var ChannelledSocket A channel for communicating with the parallel thread. */
|
||||
private $channel;
|
||||
|
||||
/** @var callable */
|
||||
private $script;
|
||||
|
||||
/** @var mixed[] */
|
||||
private $args;
|
||||
|
||||
/** @var int */
|
||||
private $oid = 0;
|
||||
|
||||
/** @var \parallel\Future|null */
|
||||
private $future;
|
||||
|
||||
/**
|
||||
* Checks if threading is enabled.
|
||||
*
|
||||
* @return bool True if threading is enabled, otherwise false.
|
||||
*/
|
||||
public static function isSupported(): bool
|
||||
{
|
||||
return \extension_loaded('parallel');
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and starts a new thread.
|
||||
*
|
||||
* @param callable $function The callable to invoke in the thread. First argument is an instance of
|
||||
* \Amp\Parallel\Sync\Channel.
|
||||
* @param mixed ...$args Additional arguments to pass to the given callable.
|
||||
*
|
||||
* @return Promise<Thread> The thread object that was spawned.
|
||||
*/
|
||||
public static function run(string $path, ...$args): Promise
|
||||
{
|
||||
$thread = new self($path, ...$args);
|
||||
return call(function () use ($thread) {
|
||||
yield $thread->start();
|
||||
return $thread;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new thread.
|
||||
*
|
||||
* @param callable $function The callable to invoke in the thread. First argument is an instance of
|
||||
* \Amp\Parallel\Sync\Channel.
|
||||
* @param mixed ...$args Additional arguments to pass to the given callable.
|
||||
*
|
||||
* @throws \Error Thrown if the pthreads extension is not available.
|
||||
*/
|
||||
public function __construct(string $script, ...$args)
|
||||
{
|
||||
$this->hub = Loop::getState(self::class);
|
||||
if (!$this->hub instanceof Internal\ProcessHub) {
|
||||
$this->hub = new Internal\ProcessHub;
|
||||
Loop::setState(self::class, $this->hub);
|
||||
}
|
||||
|
||||
if (!self::isSupported()) {
|
||||
throw new \Error("The parallel extension is required to create parallel threads.");
|
||||
}
|
||||
|
||||
$this->script = $script;
|
||||
$this->args = $args;
|
||||
|
||||
if (self::$autoloadPath === null) {
|
||||
$paths = [
|
||||
\dirname(__DIR__, 2) . \DIRECTORY_SEPARATOR . "vendor" . \DIRECTORY_SEPARATOR . "autoload.php",
|
||||
\dirname(__DIR__, 4) . \DIRECTORY_SEPARATOR . "autoload.php",
|
||||
];
|
||||
|
||||
foreach ($paths as $path) {
|
||||
if (\file_exists($path)) {
|
||||
self::$autoloadPath = $path;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (self::$autoloadPath === null) {
|
||||
throw new \Error("Could not locate autoload.php");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the thread to the condition before starting. The new thread can be started and run independently of the
|
||||
* first thread.
|
||||
*/
|
||||
public function __clone()
|
||||
{
|
||||
$this->runtime = null;
|
||||
$this->channel = null;
|
||||
$this->oid = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Kills the thread if it is still running.
|
||||
*
|
||||
* @throws \Amp\Parallel\Context\ContextException
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
if (\getmypid() === $this->oid) {
|
||||
$this->kill();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the context is running.
|
||||
*
|
||||
* @return bool True if the context is running, otherwise false.
|
||||
*/
|
||||
public function isRunning(): bool
|
||||
{
|
||||
return $this->channel !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns the thread and begins the thread's execution.
|
||||
*
|
||||
* @return Promise<null> Resolved once the thread has started.
|
||||
*
|
||||
* @throws \Amp\Parallel\Context\StatusError If the thread has already been started.
|
||||
* @throws \Amp\Parallel\Context\ContextException If starting the thread was unsuccessful.
|
||||
*/
|
||||
public function start(): Promise
|
||||
{
|
||||
if ($this->oid !== 0) {
|
||||
throw new StatusError('The thread has already been started.');
|
||||
}
|
||||
|
||||
$this->oid = \getmypid();
|
||||
|
||||
$this->runtime = new Runtime(self::$autoloadPath);
|
||||
|
||||
$id = \random_int(0, \PHP_INT_MAX);
|
||||
|
||||
$this->future = $this->runtime->run(static function (string $uri, string $key, string $path, array $arguments): int {
|
||||
\define("AMP_CONTEXT", "parallel");
|
||||
|
||||
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
|
||||
\trigger_error("Could not connect to IPC socket", E_USER_ERROR);
|
||||
return 1;
|
||||
}
|
||||
|
||||
$channel = new ChannelledSocket($socket, $socket);
|
||||
|
||||
try {
|
||||
Promise\wait($channel->send($key));
|
||||
} catch (\Throwable $exception) {
|
||||
\trigger_error("Could not send key to parent", E_USER_ERROR);
|
||||
return 1;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!\is_file($path)) {
|
||||
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
|
||||
}
|
||||
|
||||
// Protect current scope by requiring script within another function.
|
||||
$callable = Internal\loadCallable($path);
|
||||
|
||||
if (!\is_callable($callable)) {
|
||||
throw new \Error(\sprintf("Script '%s' did not return a callable function", $path));
|
||||
}
|
||||
|
||||
$result = new ExitSuccess(Promise\wait(call($callable, $channel, ...$arguments)));
|
||||
} catch (\Throwable $exception) {
|
||||
$result = new ExitFailure($exception);
|
||||
}
|
||||
|
||||
try {
|
||||
Promise\wait(Internal\sendResult($channel, $result));
|
||||
} catch (\Throwable $exception) {
|
||||
\trigger_error("Could not send result to parent; be sure to shutdown the child before ending the parent", E_USER_ERROR);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}, [
|
||||
$this->hub->getUri(),
|
||||
$this->hub->generateKey($id, self::KEY_LENGTH),
|
||||
$this->script,
|
||||
$this->args
|
||||
]);
|
||||
|
||||
return call(function () use ($id) {
|
||||
try {
|
||||
$this->channel = yield $this->hub->accept($id);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->close();
|
||||
throw new ContextException("Starting the parallel runtime failed", 0, $exception);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Immediately kills the context.
|
||||
*/
|
||||
public function kill()
|
||||
{
|
||||
$this->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes channel and socket if still open.
|
||||
*/
|
||||
private function close()
|
||||
{
|
||||
if ($this->channel !== null) {
|
||||
$this->channel->close();
|
||||
}
|
||||
|
||||
$this->channel = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a promise that resolves when the context ends and joins with the
|
||||
* parent context.
|
||||
*
|
||||
* @return \Amp\Promise<mixed>
|
||||
*
|
||||
* @throws StatusError Thrown if the context has not been started.
|
||||
* @throws SynchronizationError Thrown if an exit status object is not received.
|
||||
* @throws ContextException If the context stops responding.
|
||||
*/
|
||||
public function join(): Promise
|
||||
{
|
||||
if ($this->channel == null || $this->runtime === null || $this->future === null) {
|
||||
throw new StatusError('The thread has not been started or has already finished.');
|
||||
}
|
||||
|
||||
return call(function () {
|
||||
try {
|
||||
$response = yield $this->channel->receive();
|
||||
|
||||
if (!$response instanceof ExitResult) {
|
||||
throw new SynchronizationError('Did not receive an exit result from thread.');
|
||||
}
|
||||
} catch (ChannelException $exception) {
|
||||
$this->kill();
|
||||
throw new ContextException(
|
||||
"The context stopped responding, potentially due to a fatal error or calling exit",
|
||||
0,
|
||||
$exception
|
||||
);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->kill();
|
||||
throw $exception;
|
||||
} finally {
|
||||
$this->close();
|
||||
}
|
||||
|
||||
return $response->getResult();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function receive(): Promise
|
||||
{
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError('The process has not been started.');
|
||||
}
|
||||
|
||||
return call(function () {
|
||||
$data = yield $this->channel->receive();
|
||||
|
||||
if ($data instanceof ExitResult) {
|
||||
$data = $data->getResult();
|
||||
throw new SynchronizationError(\sprintf(
|
||||
'Thread process unexpectedly exited with result of type: %s',
|
||||
\is_object($data) ? \get_class($data) : \gettype($data)
|
||||
));
|
||||
}
|
||||
|
||||
return $data;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function send($data): Promise
|
||||
{
|
||||
if ($this->channel === null) {
|
||||
throw new StatusError('The thread has not been started or has already finished.');
|
||||
}
|
||||
|
||||
if ($data instanceof ExitResult) {
|
||||
throw new \Error('Cannot send exit result objects.');
|
||||
}
|
||||
|
||||
return $this->channel->send($data);
|
||||
}
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\Parallel\Context\Thread;
|
||||
use Amp\Parallel\Context\Parallel;
|
||||
|
||||
/**
|
||||
* The built-in worker factory type.
|
||||
@ -43,10 +43,12 @@ final class DefaultWorkerFactory implements WorkerFactory
|
||||
*/
|
||||
public function create(): Worker
|
||||
{
|
||||
if (Thread::isSupported()) {
|
||||
if (Parallel::isSupported()) {
|
||||
return new WorkerThread($this->className);
|
||||
}
|
||||
|
||||
// Thread context is not use as pthreads is deprecated.
|
||||
|
||||
return new WorkerProcess(
|
||||
$this->className,
|
||||
[],
|
||||
|
31
lib/Worker/Internal/worker-parallel.php
Normal file
31
lib/Worker/Internal/worker-parallel.php
Normal file
@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Worker\Internal;
|
||||
|
||||
use Amp\Parallel\Sync;
|
||||
use Amp\Parallel\Worker;
|
||||
use Amp\Promise;
|
||||
|
||||
return function (Sync\Channel $channel, string $className): Promise {
|
||||
if (!\defined("AMP_WORKER")) {
|
||||
\define("AMP_WORKER", "parallel");
|
||||
}
|
||||
|
||||
if (!\class_exists($className)) {
|
||||
throw new \Error(\sprintf("Invalid environment class name '%s'", $className));
|
||||
}
|
||||
|
||||
if (!\is_subclass_of($className, Worker\Environment::class)) {
|
||||
throw new \Error(\sprintf(
|
||||
"The class '%s' does not implement '%s'",
|
||||
$className,
|
||||
Worker\Environment::class
|
||||
));
|
||||
}
|
||||
|
||||
$environment = new $className;
|
||||
|
||||
$runner = new Worker\TaskRunner($channel, $environment);
|
||||
|
||||
return $runner->run();
|
||||
};
|
24
lib/Worker/WorkerParallel.php
Normal file
24
lib/Worker/WorkerParallel.php
Normal file
@ -0,0 +1,24 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\Parallel\Context\Parallel;
|
||||
|
||||
/**
|
||||
* A worker process that executes task objects.
|
||||
*/
|
||||
final class WorkerParallel extends TaskWorker
|
||||
{
|
||||
const SCRIPT_PATH = __DIR__ . "/Internal/worker-parallel.php";
|
||||
|
||||
/**
|
||||
* @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate.
|
||||
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
|
||||
*
|
||||
* @throws \Error If the PHP binary path given cannot be found or is not executable.
|
||||
*/
|
||||
public function __construct(string $envClassName = BasicEnvironment::class)
|
||||
{
|
||||
parent::__construct(new Parallel(self::SCRIPT_PATH, $envClassName));
|
||||
}
|
||||
}
|
100
test/Context/ParallelTest.php
Normal file
100
test/Context/ParallelTest.php
Normal file
@ -0,0 +1,100 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Test\Context;
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Parallel\Context\Parallel;
|
||||
use Amp\PHPUnit\TestCase;
|
||||
|
||||
class ParallelTest extends TestCase
|
||||
{
|
||||
public function testBasicProcess()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$thread = new Parallel(__DIR__ . "/test-parallel.php", "Test");
|
||||
yield $thread->start();
|
||||
$this->assertSame("Test", yield $thread->join());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\Sync\PanicError
|
||||
* @expectedExceptionMessage No string provided
|
||||
*/
|
||||
public function testFailingProcess()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$thread = new Parallel(__DIR__ . "/test-process.php");
|
||||
yield $thread->start();
|
||||
yield $thread->join();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\Sync\PanicError
|
||||
* @expectedExceptionMessage No script found at '../test-process.php'
|
||||
*/
|
||||
public function testInvalidScriptPath()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$thread = new Parallel("../test-process.php");
|
||||
yield $thread->start();
|
||||
yield $thread->join();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\Sync\PanicError
|
||||
* @expectedExceptionMessage The given data cannot be sent because it is not serializable
|
||||
*/
|
||||
public function testInvalidResult()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$thread = new Parallel(__DIR__ . "/invalid-result-process.php");
|
||||
yield $thread->start();
|
||||
\var_dump(yield $thread->join());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\Sync\PanicError
|
||||
* @expectedExceptionMessage did not return a callable function
|
||||
*/
|
||||
public function testNoCallbackReturned()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$thread = new Parallel(__DIR__ . "/no-callback-process.php");
|
||||
yield $thread->start();
|
||||
\var_dump(yield $thread->join());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\Sync\PanicError
|
||||
* @expectedExceptionMessage Uncaught ParseError in execution context
|
||||
*/
|
||||
public function testParseError()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$thread = new Parallel(__DIR__ . "/parse-error-process.inc");
|
||||
yield $thread->start();
|
||||
\var_dump(yield $thread->join());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Amp\Parallel\Context\ContextException
|
||||
* @expectedExceptionMessage The context stopped responding, potentially due to a fatal error or calling exit
|
||||
*/
|
||||
public function testKillWhenJoining()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$thread = new Parallel(__DIR__ . "/sleep-process.php");
|
||||
yield $thread->start();
|
||||
$promise = $thread->join();
|
||||
$thread->kill();
|
||||
$this->assertFalse($thread->isRunning());
|
||||
yield $promise;
|
||||
});
|
||||
}
|
||||
}
|
7
test/Context/sleep-parallel.php
Normal file
7
test/Context/sleep-parallel.php
Normal file
@ -0,0 +1,7 @@
|
||||
<?php
|
||||
|
||||
use Amp\Parallel\Sync\Channel;
|
||||
|
||||
return function (Channel $channel, int $time = null) {
|
||||
\sleep($time ?? 1);
|
||||
};
|
7
test/Context/test-parallel.php
Normal file
7
test/Context/test-parallel.php
Normal file
@ -0,0 +1,7 @@
|
||||
<?php
|
||||
|
||||
use Amp\Parallel\Sync\Channel;
|
||||
|
||||
return function (Channel $channel, string $data): string {
|
||||
return $data;
|
||||
};
|
13
test/Worker/WorkerParallelTest.php
Normal file
13
test/Worker/WorkerParallelTest.php
Normal file
@ -0,0 +1,13 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Parallel\Test\Worker;
|
||||
|
||||
use Amp\Parallel\Worker\WorkerParallel;
|
||||
|
||||
class WorkerParallelTest extends AbstractWorkerTest
|
||||
{
|
||||
protected function createWorker()
|
||||
{
|
||||
return new WorkerParallel;
|
||||
}
|
||||
}
|
10
travis/install-parallel.sh
Executable file
10
travis/install-parallel.sh
Executable file
@ -0,0 +1,10 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
git clone https://github.com/krakjoe/parallel;
|
||||
pushd parallel;
|
||||
phpize;
|
||||
./configure;
|
||||
make;
|
||||
make install;
|
||||
popd;
|
||||
echo "extension=parallel.so" >> "$(php -r 'echo php_ini_loaded_file();')"
|
Loading…
Reference in New Issue
Block a user