mirror of
https://github.com/danog/parallel.git
synced 2025-01-22 14:01:14 +01:00
Update for krakjoe/parallel changes
This commit is contained in:
parent
410c88e859
commit
2d35051391
@ -37,7 +37,6 @@
|
|||||||
"Amp\\Parallel\\": "lib"
|
"Amp\\Parallel\\": "lib"
|
||||||
},
|
},
|
||||||
"files": [
|
"files": [
|
||||||
"lib/Context/Internal/functions.php",
|
|
||||||
"lib/Worker/functions.php"
|
"lib/Worker/functions.php"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
79
lib/Context/Internal/ParallelRunner.php
Normal file
79
lib/Context/Internal/ParallelRunner.php
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Amp\Parallel\Context\Internal;
|
||||||
|
|
||||||
|
use Amp\Parallel\Sync\Channel;
|
||||||
|
use Amp\Parallel\Sync\ChannelException;
|
||||||
|
use Amp\Parallel\Sync\ExitFailure;
|
||||||
|
use Amp\Parallel\Sync\ExitSuccess;
|
||||||
|
use Amp\Parallel\Sync\SerializationException;
|
||||||
|
use Amp\Promise;
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
|
final class ParallelRunner
|
||||||
|
{
|
||||||
|
public static function execute(Channel $channel, string $path, string $arguments): int
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
\set_error_handler(function ($errno, $errstr, $errfile, $errline) {
|
||||||
|
if ($errno & \error_reporting()) {
|
||||||
|
throw new ChannelException(\sprintf(
|
||||||
|
'Received corrupted data. Errno: %d; %s in file %s on line %d',
|
||||||
|
$errno,
|
||||||
|
$errstr,
|
||||||
|
$errfile,
|
||||||
|
$errline
|
||||||
|
));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Attempt to unserialize function arguments
|
||||||
|
try {
|
||||||
|
$arguments = \unserialize($arguments);
|
||||||
|
} catch (\Throwable $exception) {
|
||||||
|
throw new SerializationException("Exception thrown when unserializing data", 0, $exception);
|
||||||
|
} finally {
|
||||||
|
\restore_error_handler();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!\is_array($arguments)) { // This *should not* be able to happen.
|
||||||
|
throw new \Error("Arguments did not unserialize to an array");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!\is_file($path)) {
|
||||||
|
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Protect current scope by requiring script within another function.
|
||||||
|
$callable = (function () use ($path): callable {
|
||||||
|
return require $path;
|
||||||
|
})();
|
||||||
|
} catch (\TypeError $exception) {
|
||||||
|
throw new \Error(\sprintf("Script '%s' did not return a callable function", $path), 0, $exception);
|
||||||
|
} catch (\ParseError $exception) {
|
||||||
|
throw new \Error(\sprintf("Script '%s' contains a parse error", $path), 0, $exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
$result = new ExitSuccess(Promise\wait(call($callable, $channel, ...$arguments)));
|
||||||
|
} catch (\Throwable $exception) {
|
||||||
|
$result = new ExitFailure($exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Promise\wait(call(function () use ($channel, $result) {
|
||||||
|
try {
|
||||||
|
yield $channel->send($result);
|
||||||
|
} catch (SerializationException $exception) {
|
||||||
|
// Serializing the result failed. Send the reason why.
|
||||||
|
yield $channel->send(new ExitFailure($exception));
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
} catch (\Throwable $exception) {
|
||||||
|
\trigger_error("Could not send result to parent; be sure to shutdown the child before ending the parent", E_USER_ERROR);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
@ -1,41 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Parallel\Context\Internal;
|
|
||||||
|
|
||||||
use Amp\Parallel\Sync\Channel;
|
|
||||||
use Amp\Parallel\Sync\ChannelException;
|
|
||||||
use Amp\Parallel\Sync\ExitFailure;
|
|
||||||
use Amp\Parallel\Sync\ExitResult;
|
|
||||||
use Amp\Parallel\Sync\SerializationException;
|
|
||||||
use Amp\Promise;
|
|
||||||
use function Amp\call;
|
|
||||||
|
|
||||||
function loadCallable(string $path): callable
|
|
||||||
{
|
|
||||||
return require $path;
|
|
||||||
}
|
|
||||||
|
|
||||||
function sendResult(Channel $channel, ExitResult $result): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($channel, $result) {
|
|
||||||
try {
|
|
||||||
yield $channel->send($result);
|
|
||||||
} catch (SerializationException $exception) {
|
|
||||||
// Serializing the result failed. Send the reason why.
|
|
||||||
yield $channel->send(new ExitFailure($exception));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function errorHandler($errno, $errstr, $errfile, $errline)
|
|
||||||
{
|
|
||||||
if ($errno & \error_reporting()) {
|
|
||||||
throw new ChannelException(\sprintf(
|
|
||||||
'Received corrupted data. Errno: %d; %s in file %s on line %d',
|
|
||||||
$errno,
|
|
||||||
$errstr,
|
|
||||||
$errfile,
|
|
||||||
$errline
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
@ -6,12 +6,11 @@ use Amp\Failure;
|
|||||||
use Amp\Loop;
|
use Amp\Loop;
|
||||||
use Amp\Parallel\Sync\ChannelException;
|
use Amp\Parallel\Sync\ChannelException;
|
||||||
use Amp\Parallel\Sync\ChannelledSocket;
|
use Amp\Parallel\Sync\ChannelledSocket;
|
||||||
use Amp\Parallel\Sync\ExitFailure;
|
|
||||||
use Amp\Parallel\Sync\ExitResult;
|
use Amp\Parallel\Sync\ExitResult;
|
||||||
use Amp\Parallel\Sync\ExitSuccess;
|
|
||||||
use Amp\Parallel\Sync\SerializationException;
|
use Amp\Parallel\Sync\SerializationException;
|
||||||
use Amp\Parallel\Sync\SynchronizationError;
|
use Amp\Parallel\Sync\SynchronizationError;
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
|
use parallel\Exception as ParallelException;
|
||||||
use parallel\Runtime;
|
use parallel\Runtime;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
@ -198,48 +197,7 @@ final class Parallel implements Context
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
return Internal\ParallelRunner::execute($channel, $path, $arguments);
|
||||||
\set_error_handler(__NAMESPACE__ . '\\Internal\\errorHandler');
|
|
||||||
|
|
||||||
// Attempt to unserialize function arguments
|
|
||||||
try {
|
|
||||||
$arguments = \unserialize($arguments);
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
throw new SerializationException("Exception thrown when unserializing data", 0, $exception);
|
|
||||||
} finally {
|
|
||||||
\restore_error_handler();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!\is_array($arguments)) { // This *should not* be able to happen.
|
|
||||||
throw new \Error("Arguments did not unserialize to an array");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!\is_file($path)) {
|
|
||||||
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Protect current scope by requiring script within another function.
|
|
||||||
$callable = Internal\loadCallable($path);
|
|
||||||
} catch (\TypeError $exception) {
|
|
||||||
throw new \Error(\sprintf("Script '%s' did not return a callable function", $path), 0, $exception);
|
|
||||||
} catch (\ParseError $exception) {
|
|
||||||
throw new \Error(\sprintf("Script '%s' contains a parse error", $path), 0, $exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
$result = new ExitSuccess(Promise\wait(call($callable, $channel, ...$arguments)));
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
$result = new ExitFailure($exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Promise\wait(Internal\sendResult($channel, $result));
|
|
||||||
} catch (\Throwable $exception) {
|
|
||||||
\trigger_error("Could not send result to parent; be sure to shutdown the child before ending the parent", E_USER_ERROR);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}, [
|
}, [
|
||||||
$this->hub->getUri(),
|
$this->hub->getUri(),
|
||||||
$this->hub->generateKey($id, self::KEY_LENGTH),
|
$this->hub->generateKey($id, self::KEY_LENGTH),
|
||||||
@ -264,7 +222,9 @@ final class Parallel implements Context
|
|||||||
{
|
{
|
||||||
if ($this->runtime !== null) {
|
if ($this->runtime !== null) {
|
||||||
try {
|
try {
|
||||||
//$this->runtime->kill();
|
$this->runtime->kill();
|
||||||
|
} catch (ParallelException $exception) {
|
||||||
|
// Ignore runtime being unusable since we're killing it anyway.
|
||||||
} finally {
|
} finally {
|
||||||
$this->close();
|
$this->close();
|
||||||
}
|
}
|
||||||
|
7
test/Context/Fixtures/sleep-parallel.php
Normal file
7
test/Context/Fixtures/sleep-parallel.php
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
use Amp\Parallel\Sync\Channel;
|
||||||
|
|
||||||
|
return function (Channel $channel, int $time = 1) {
|
||||||
|
\sleep($time);
|
||||||
|
};
|
@ -11,7 +11,7 @@ class ParallelTest extends TestCase
|
|||||||
public function testBasicProcess()
|
public function testBasicProcess()
|
||||||
{
|
{
|
||||||
Loop::run(function () {
|
Loop::run(function () {
|
||||||
$thread = new Parallel(__DIR__ . "/test-parallel.php", "Test");
|
$thread = new Parallel(__DIR__ . "/Fixtures/test-parallel.php", "Test");
|
||||||
yield $thread->start();
|
yield $thread->start();
|
||||||
$this->assertSame("Test", yield $thread->join());
|
$this->assertSame("Test", yield $thread->join());
|
||||||
});
|
});
|
||||||
@ -24,7 +24,7 @@ class ParallelTest extends TestCase
|
|||||||
public function testFailingProcess()
|
public function testFailingProcess()
|
||||||
{
|
{
|
||||||
Loop::run(function () {
|
Loop::run(function () {
|
||||||
$thread = new Parallel(__DIR__ . "/test-process.php");
|
$thread = new Parallel(__DIR__ . "/Fixtures/test-process.php");
|
||||||
yield $thread->start();
|
yield $thread->start();
|
||||||
yield $thread->join();
|
yield $thread->join();
|
||||||
});
|
});
|
||||||
@ -50,7 +50,7 @@ class ParallelTest extends TestCase
|
|||||||
public function testInvalidResult()
|
public function testInvalidResult()
|
||||||
{
|
{
|
||||||
Loop::run(function () {
|
Loop::run(function () {
|
||||||
$thread = new Parallel(__DIR__ . "/invalid-result-process.php");
|
$thread = new Parallel(__DIR__ . "/Fixtures/invalid-result-process.php");
|
||||||
yield $thread->start();
|
yield $thread->start();
|
||||||
\var_dump(yield $thread->join());
|
\var_dump(yield $thread->join());
|
||||||
});
|
});
|
||||||
@ -63,7 +63,7 @@ class ParallelTest extends TestCase
|
|||||||
public function testNoCallbackReturned()
|
public function testNoCallbackReturned()
|
||||||
{
|
{
|
||||||
Loop::run(function () {
|
Loop::run(function () {
|
||||||
$thread = new Parallel(__DIR__ . "/no-callback-process.php");
|
$thread = new Parallel(__DIR__ . "/Fixtures/no-callback-process.php");
|
||||||
yield $thread->start();
|
yield $thread->start();
|
||||||
\var_dump(yield $thread->join());
|
\var_dump(yield $thread->join());
|
||||||
});
|
});
|
||||||
@ -76,7 +76,7 @@ class ParallelTest extends TestCase
|
|||||||
public function testParseError()
|
public function testParseError()
|
||||||
{
|
{
|
||||||
Loop::run(function () {
|
Loop::run(function () {
|
||||||
$thread = new Parallel(__DIR__ . "/parse-error-process.inc");
|
$thread = new Parallel(__DIR__ . "/Fixtures/parse-error-process.inc");
|
||||||
yield $thread->start();
|
yield $thread->start();
|
||||||
\var_dump(yield $thread->join());
|
\var_dump(yield $thread->join());
|
||||||
});
|
});
|
||||||
@ -89,7 +89,7 @@ class ParallelTest extends TestCase
|
|||||||
public function testKillWhenJoining()
|
public function testKillWhenJoining()
|
||||||
{
|
{
|
||||||
Loop::run(function () {
|
Loop::run(function () {
|
||||||
$thread = new Parallel(__DIR__ . "/sleep-process.php");
|
$thread = new Parallel(__DIR__ . "/Fixtures/sleep-parallel.php");
|
||||||
yield $thread->start();
|
yield $thread->start();
|
||||||
$promise = $thread->join();
|
$promise = $thread->join();
|
||||||
$thread->kill();
|
$thread->kill();
|
||||||
|
@ -1,7 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
use Amp\Parallel\Sync\Channel;
|
|
||||||
|
|
||||||
return function (Channel $channel, int $time = null) {
|
|
||||||
\sleep($time ?? 1);
|
|
||||||
};
|
|
Loading…
x
Reference in New Issue
Block a user