mirror of
https://github.com/danog/parallel.git
synced 2024-12-03 10:07:49 +01:00
Serialize function arguments
Maybe this is a horrible idea, but we serialize everything else, so why not?
This commit is contained in:
parent
ecaf0a854b
commit
410c88e859
@ -3,13 +3,14 @@
|
|||||||
namespace Amp\Parallel\Context\Internal;
|
namespace Amp\Parallel\Context\Internal;
|
||||||
|
|
||||||
use Amp\Parallel\Sync\Channel;
|
use Amp\Parallel\Sync\Channel;
|
||||||
|
use Amp\Parallel\Sync\ChannelException;
|
||||||
use Amp\Parallel\Sync\ExitFailure;
|
use Amp\Parallel\Sync\ExitFailure;
|
||||||
use Amp\Parallel\Sync\ExitResult;
|
use Amp\Parallel\Sync\ExitResult;
|
||||||
use Amp\Parallel\Sync\SerializationException;
|
use Amp\Parallel\Sync\SerializationException;
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
function loadCallable(string $path)
|
function loadCallable(string $path): callable
|
||||||
{
|
{
|
||||||
return require $path;
|
return require $path;
|
||||||
}
|
}
|
||||||
@ -25,3 +26,16 @@ function sendResult(Channel $channel, ExitResult $result): Promise
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function errorHandler($errno, $errstr, $errfile, $errline)
|
||||||
|
{
|
||||||
|
if ($errno & \error_reporting()) {
|
||||||
|
throw new ChannelException(\sprintf(
|
||||||
|
'Received corrupted data. Errno: %d; %s in file %s on line %d',
|
||||||
|
$errno,
|
||||||
|
$errstr,
|
||||||
|
$errfile,
|
||||||
|
$errline
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -2,15 +2,16 @@
|
|||||||
|
|
||||||
namespace Amp\Parallel\Context;
|
namespace Amp\Parallel\Context;
|
||||||
|
|
||||||
|
use Amp\Failure;
|
||||||
use Amp\Loop;
|
use Amp\Loop;
|
||||||
use Amp\Parallel\Sync\ChannelException;
|
use Amp\Parallel\Sync\ChannelException;
|
||||||
use Amp\Parallel\Sync\ChannelledSocket;
|
use Amp\Parallel\Sync\ChannelledSocket;
|
||||||
use Amp\Parallel\Sync\ExitFailure;
|
use Amp\Parallel\Sync\ExitFailure;
|
||||||
use Amp\Parallel\Sync\ExitResult;
|
use Amp\Parallel\Sync\ExitResult;
|
||||||
use Amp\Parallel\Sync\ExitSuccess;
|
use Amp\Parallel\Sync\ExitSuccess;
|
||||||
|
use Amp\Parallel\Sync\SerializationException;
|
||||||
use Amp\Parallel\Sync\SynchronizationError;
|
use Amp\Parallel\Sync\SynchronizationError;
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use parallel\Exception as ParallelException;
|
|
||||||
use parallel\Runtime;
|
use parallel\Runtime;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
@ -32,13 +33,13 @@ final class Parallel implements Context
|
|||||||
/** @var Internal\ProcessHub */
|
/** @var Internal\ProcessHub */
|
||||||
private $hub;
|
private $hub;
|
||||||
|
|
||||||
/** @var Runtime */
|
/** @var Runtime|null */
|
||||||
private $runtime;
|
private $runtime;
|
||||||
|
|
||||||
/** @var ChannelledSocket A channel for communicating with the parallel thread. */
|
/** @var ChannelledSocket|null A channel for communicating with the parallel thread. */
|
||||||
private $channel;
|
private $channel;
|
||||||
|
|
||||||
/** @var callable */
|
/** @var string Script path. */
|
||||||
private $script;
|
private $script;
|
||||||
|
|
||||||
/** @var mixed[] */
|
/** @var mixed[] */
|
||||||
@ -168,13 +169,19 @@ final class Parallel implements Context
|
|||||||
throw new StatusError('The thread has already been started.');
|
throw new StatusError('The thread has already been started.');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$arguments = \serialize($this->args);
|
||||||
|
} catch (\Throwable $exception) {
|
||||||
|
return new Failure(new SerializationException("Arguments must be serializable.", 0, $exception));
|
||||||
|
}
|
||||||
|
|
||||||
$this->oid = \getmypid();
|
$this->oid = \getmypid();
|
||||||
|
|
||||||
$this->runtime = new Runtime(self::$autoloadPath);
|
$this->runtime = new Runtime(self::$autoloadPath);
|
||||||
|
|
||||||
$id = \random_int(0, \PHP_INT_MAX);
|
$id = \random_int(0, \PHP_INT_MAX);
|
||||||
|
|
||||||
$this->future = $this->runtime->run(static function (string $uri, string $key, string $path, array $arguments): int {
|
$this->future = $this->runtime->run(static function (string $uri, string $key, string $path, string $arguments): int {
|
||||||
\define("AMP_CONTEXT", "parallel");
|
\define("AMP_CONTEXT", "parallel");
|
||||||
|
|
||||||
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
|
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
|
||||||
@ -192,15 +199,32 @@ final class Parallel implements Context
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
\set_error_handler(__NAMESPACE__ . '\\Internal\\errorHandler');
|
||||||
|
|
||||||
|
// Attempt to unserialize function arguments
|
||||||
|
try {
|
||||||
|
$arguments = \unserialize($arguments);
|
||||||
|
} catch (\Throwable $exception) {
|
||||||
|
throw new SerializationException("Exception thrown when unserializing data", 0, $exception);
|
||||||
|
} finally {
|
||||||
|
\restore_error_handler();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!\is_array($arguments)) { // This *should not* be able to happen.
|
||||||
|
throw new \Error("Arguments did not unserialize to an array");
|
||||||
|
}
|
||||||
|
|
||||||
if (!\is_file($path)) {
|
if (!\is_file($path)) {
|
||||||
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
|
throw new \Error(\sprintf("No script found at '%s' (be sure to provide the full path to the script)", $path));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
// Protect current scope by requiring script within another function.
|
// Protect current scope by requiring script within another function.
|
||||||
$callable = Internal\loadCallable($path);
|
$callable = Internal\loadCallable($path);
|
||||||
|
} catch (\TypeError $exception) {
|
||||||
if (!\is_callable($callable)) {
|
throw new \Error(\sprintf("Script '%s' did not return a callable function", $path), 0, $exception);
|
||||||
throw new \Error(\sprintf("Script '%s' did not return a callable function", $path));
|
} catch (\ParseError $exception) {
|
||||||
|
throw new \Error(\sprintf("Script '%s' contains a parse error", $path), 0, $exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
$result = new ExitSuccess(Promise\wait(call($callable, $channel, ...$arguments)));
|
$result = new ExitSuccess(Promise\wait(call($callable, $channel, ...$arguments)));
|
||||||
@ -220,14 +244,14 @@ final class Parallel implements Context
|
|||||||
$this->hub->getUri(),
|
$this->hub->getUri(),
|
||||||
$this->hub->generateKey($id, self::KEY_LENGTH),
|
$this->hub->generateKey($id, self::KEY_LENGTH),
|
||||||
$this->script,
|
$this->script,
|
||||||
$this->args
|
$arguments
|
||||||
]);
|
]);
|
||||||
|
|
||||||
return call(function () use ($id) {
|
return call(function () use ($id) {
|
||||||
try {
|
try {
|
||||||
$this->channel = yield $this->hub->accept($id);
|
$this->channel = yield $this->hub->accept($id);
|
||||||
} catch (\Throwable $exception) {
|
} catch (\Throwable $exception) {
|
||||||
$this->close();
|
$this->kill();
|
||||||
throw new ContextException("Starting the parallel runtime failed", 0, $exception);
|
throw new ContextException("Starting the parallel runtime failed", 0, $exception);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -238,8 +262,14 @@ final class Parallel implements Context
|
|||||||
*/
|
*/
|
||||||
public function kill()
|
public function kill()
|
||||||
{
|
{
|
||||||
|
if ($this->runtime !== null) {
|
||||||
|
try {
|
||||||
|
//$this->runtime->kill();
|
||||||
|
} finally {
|
||||||
$this->close();
|
$this->close();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes channel and socket if still open.
|
* Closes channel and socket if still open.
|
||||||
|
@ -71,7 +71,7 @@ class ParallelTest extends TestCase
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* @expectedException \Amp\Parallel\Sync\PanicError
|
* @expectedException \Amp\Parallel\Sync\PanicError
|
||||||
* @expectedExceptionMessage Uncaught ParseError in execution context
|
* @expectedExceptionMessageRegExp /Script (.*) contains a parse error/
|
||||||
*/
|
*/
|
||||||
public function testParseError()
|
public function testParseError()
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user