Rewrite on top of opis/closure

This commit is contained in:
Niklas Keller 2017-12-14 17:33:41 +01:00
parent 43c6fc74c9
commit f945e64f32
4 changed files with 20 additions and 63 deletions

View File

@ -21,7 +21,7 @@
"php": ">=7",
"amphp/parallel": "^0.1",
"amphp/amp": "^2",
"jeremeamia/SuperClosure": "^2.3"
"opis/closure": "^3.0.7"
},
"require-dev": {
"amphp/phpunit-util": "^1.0",

View File

@ -4,20 +4,9 @@ namespace Amp\ParallelFunctions\Internal;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
use SuperClosure\Exception\ClosureUnserializationException;
use SuperClosure\Serializer;
/** @internal */
class ParallelTask implements Task {
const TYPE_SIMPLE = 0;
const TYPE_CLOSURE = 1;
/** @var Serializer */
private static $serializer;
/** @var int */
private $type;
/** @var string */
private $function;
@ -25,30 +14,16 @@ class ParallelTask implements Task {
private $args;
/**
* @param int $type Type of function.
* @param string $function Serialized function.
* @param array $args Arguments to pass to the function. Must be serializable.
*
* @throws ClosureUnserializationException
*/
public function __construct(int $type, string $function, array $args) {
$this->type = $type;
public function __construct(string $function, array $args) {
$this->function = $function;
$this->args = $args;
}
public function run(Environment $environment) {
if (self::$serializer === null) {
static::$serializer = new Serializer;
}
if ($this->type === self::TYPE_SIMPLE) {
$callable = $this->function;
} elseif ($this->type === self::TYPE_CLOSURE) {
$callable = self::$serializer->unserialize($this->function);
} else {
throw \Error('Unsupported parallel task type: ' . $this->type);
}
$callable = \unserialize($this->function, ['allowed_classes' => true]);
return $callable(...$this->args);
}

View File

@ -5,7 +5,7 @@ namespace Amp\ParallelFunctions;
use Amp\MultiReasonException;
use Amp\ParallelFunctions\Internal\ParallelTask;
use Amp\Promise;
use SuperClosure\Serializer;
use Opis\Closure\SerializableClosure;
use function Amp\call;
use function Amp\Parallel\Worker\enqueue;
use function Amp\Promise\any;
@ -19,39 +19,20 @@ use function Amp\Promise\any;
* @throws \Error If the passed callable is not safely serializable.
*/
function parallel(callable $callable): callable {
static $serializer, $errorHandler;
if ($serializer === null) {
$serializer = new Serializer;
try {
if (\is_string($callable)) {
$payload = \serialize($callable);
} elseif ($callable instanceof \Closure) {
$payload = \serialize(new SerializableClosure($callable));
} else {
throw new \Error('Unsupported callable type: ' . \gettype($callable));
}
} catch (\Exception $e) {
throw new \Error('Unsupported callable: ' . $e->getMessage());
}
if (\is_string($callable)) {
$payload = $callable;
$type = ParallelTask::TYPE_SIMPLE;
} elseif ($callable instanceof \Closure) {
if ($errorHandler === null) {
$errorHandler = function ($errno, $errstr) {
if ($errno & \error_reporting()) {
throw new \Error($errstr);
}
};
}
// Set custom error handler because Serializer only issues a notice if serialization fails.
\set_error_handler($errorHandler);
try {
$payload = $serializer->serialize($callable);
$type = ParallelTask::TYPE_CLOSURE;
} finally {
\restore_error_handler();
}
} else {
throw new \Error('Unsupported callable type: ' . \gettype($callable));
}
return function (...$args) use ($type, $payload): Promise {
return enqueue(new ParallelTask($type, $payload, $args));
return function (...$args) use ($payload): Promise {
return enqueue(new ParallelTask($payload, $args));
};
}
@ -62,7 +43,7 @@ function parallel(callable $callable): callable {
* @param callable $callable
*
* @return Promise Resolves to the result once the operation finished.
* @throws \Error
* @throws \Error If the passed callable is not safely serializable.
*/
function parallelMap(array $array, callable $callable): Promise {
return call(function () use ($array, $callable) {
@ -86,6 +67,7 @@ function parallelMap(array $array, callable $callable): Promise {
* @param int $flag
*
* @return Promise
* @throws \Error If the passed callable is not safely serializable.
*/
function parallelFilter(array $array, callable $callable = null, int $flag = 0): Promise {
return call(function () use ($array, $callable, $flag) {

View File

@ -8,12 +8,12 @@ use function Amp\ParallelFunctions\parallel;
class ParallelTest extends TestCase {
/**
* @expectedException \Error
* @expectedExceptionMessage Serialization of closure failed
* @expectedExceptionMessage Unsupported callable: Serialization of 'class@anonymous' is not allowed
*/
public function testUnserializableClosure() {
$unserializable = new class {
};
$callable = parallel(function () use ($unserializable) {
parallel(function () use ($unserializable) {
return 1;
});
}