1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Use closure within parallel runner

This commit is contained in:
Aaron Piotrowski 2019-05-01 13:30:57 -05:00
parent 2d02d964f4
commit 1acd54e848
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 39 additions and 60 deletions

View File

@ -1,59 +0,0 @@
<?php
namespace Amp\Parallel\Context\Internal;
use Amp\Loop;
use Amp\Parallel\Sync\Channel;
use Amp\Parallel\Sync\ExitFailure;
use Amp\Parallel\Sync\ExitSuccess;
use Amp\Parallel\Sync\SerializationException;
use Amp\Promise;
use function Amp\call;
/**
* @codeCoverageIgnore Only executed in thread.
*/
final class ParallelRunner
{
const EXIT_CHECK_FREQUENCY = 250;
public static function run(Channel $channel, string $path, array $argv): void
{
Loop::unreference(Loop::repeat(self::EXIT_CHECK_FREQUENCY, function () {
// Timer to give the chance for the PHP VM to be interrupted by Runtime::kill(), since system calls such as
// select() will not be interrupted.
}));
try {
if (!\is_file($path)) {
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
}
$argc = \array_unshift($argv, $path);
try {
// Protect current scope by requiring script within another function.
$callable = (function () use ($argc, $argv): callable { // Using $argc so it is available to the required script.
return require $argv[0];
})();
} catch (\TypeError $exception) {
throw new \Error(\sprintf("Script '%s' did not return a callable function", $path), 0, $exception);
} catch (\ParseError $exception) {
throw new \Error(\sprintf("Script '%s' contains a parse error", $path), 0, $exception);
}
$result = new ExitSuccess(Promise\wait(call($callable, $channel)));
} catch (\Throwable $exception) {
$result = new ExitFailure($exception);
}
Promise\wait(call(function () use ($channel, $result) {
try {
yield $channel->send($result);
} catch (SerializationException $exception) {
// Serializing the result failed. Send the reason why.
yield $channel->send(new ExitFailure($exception));
}
}));
}
}

View File

@ -4,7 +4,10 @@ namespace Amp\Parallel\Context;
use Amp\Loop;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitFailure;
use Amp\Parallel\Sync\ExitResult;
use Amp\Parallel\Sync\ExitSuccess;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Sync\SynchronizationError;
use Amp\Promise;
use parallel\Runtime;
@ -196,7 +199,42 @@ final class Parallel implements Context
}
try {
Internal\ParallelRunner::run($channel, $path, $argv);
Loop::unreference(Loop::repeat(self::EXIT_CHECK_FREQUENCY, function () {
// Timer to give the chance for the PHP VM to be interrupted by Runtime::kill(), since system calls such as
// select() will not be interrupted.
}));
try {
if (!\is_file($path)) {
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
}
$argc = \array_unshift($argv, $path);
try {
// Protect current scope by requiring script within another function.
$callable = (function () use ($argc, $argv): callable { // Using $argc so it is available to the required script.
return require $argv[0];
})();
} catch (\TypeError $exception) {
throw new \Error(\sprintf("Script '%s' did not return a callable function", $path), 0, $exception);
} catch (\ParseError $exception) {
throw new \Error(\sprintf("Script '%s' contains a parse error", $path), 0, $exception);
}
$result = new ExitSuccess(Promise\wait(call($callable, $channel)));
} catch (\Throwable $exception) {
$result = new ExitFailure($exception);
}
Promise\wait(call(function () use ($channel, $result) {
try {
yield $channel->send($result);
} catch (SerializationException $exception) {
// Serializing the result failed. Send the reason why.
yield $channel->send(new ExitFailure($exception));
}
}));
} catch (\Throwable $exception) {
\trigger_error("Could not send result to parent; be sure to shutdown the child before ending the parent", E_USER_ERROR);
return 1;