From f945e64f32ab320a61f4c26855af0dab4f56e3cc Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Thu, 14 Dec 2017 17:33:41 +0100 Subject: [PATCH] Rewrite on top of opis/closure --- composer.json | 2 +- src/Internal/ParallelTask.php | 29 ++------------------- src/functions.php | 48 +++++++++++------------------------ test/ParallelTest.php | 4 +-- 4 files changed, 20 insertions(+), 63 deletions(-) diff --git a/composer.json b/composer.json index f63eed2..773d526 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,7 @@ "php": ">=7", "amphp/parallel": "^0.1", "amphp/amp": "^2", - "jeremeamia/SuperClosure": "^2.3" + "opis/closure": "^3.0.7" }, "require-dev": { "amphp/phpunit-util": "^1.0", diff --git a/src/Internal/ParallelTask.php b/src/Internal/ParallelTask.php index 8cb6a75..8656052 100644 --- a/src/Internal/ParallelTask.php +++ b/src/Internal/ParallelTask.php @@ -4,20 +4,9 @@ namespace Amp\ParallelFunctions\Internal; use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Task; -use SuperClosure\Exception\ClosureUnserializationException; -use SuperClosure\Serializer; /** @internal */ class ParallelTask implements Task { - const TYPE_SIMPLE = 0; - const TYPE_CLOSURE = 1; - - /** @var Serializer */ - private static $serializer; - - /** @var int */ - private $type; - /** @var string */ private $function; @@ -25,30 +14,16 @@ class ParallelTask implements Task { private $args; /** - * @param int $type Type of function. * @param string $function Serialized function. * @param array $args Arguments to pass to the function. Must be serializable. - * - * @throws ClosureUnserializationException */ - public function __construct(int $type, string $function, array $args) { - $this->type = $type; + public function __construct(string $function, array $args) { $this->function = $function; $this->args = $args; } public function run(Environment $environment) { - if (self::$serializer === null) { - static::$serializer = new Serializer; - } - - if ($this->type === self::TYPE_SIMPLE) { - $callable = $this->function; - } elseif ($this->type === self::TYPE_CLOSURE) { - $callable = self::$serializer->unserialize($this->function); - } else { - throw \Error('Unsupported parallel task type: ' . $this->type); - } + $callable = \unserialize($this->function, ['allowed_classes' => true]); return $callable(...$this->args); } diff --git a/src/functions.php b/src/functions.php index 9518229..53fd862 100644 --- a/src/functions.php +++ b/src/functions.php @@ -5,7 +5,7 @@ namespace Amp\ParallelFunctions; use Amp\MultiReasonException; use Amp\ParallelFunctions\Internal\ParallelTask; use Amp\Promise; -use SuperClosure\Serializer; +use Opis\Closure\SerializableClosure; use function Amp\call; use function Amp\Parallel\Worker\enqueue; use function Amp\Promise\any; @@ -19,39 +19,20 @@ use function Amp\Promise\any; * @throws \Error If the passed callable is not safely serializable. */ function parallel(callable $callable): callable { - static $serializer, $errorHandler; - - if ($serializer === null) { - $serializer = new Serializer; + try { + if (\is_string($callable)) { + $payload = \serialize($callable); + } elseif ($callable instanceof \Closure) { + $payload = \serialize(new SerializableClosure($callable)); + } else { + throw new \Error('Unsupported callable type: ' . \gettype($callable)); + } + } catch (\Exception $e) { + throw new \Error('Unsupported callable: ' . $e->getMessage()); } - if (\is_string($callable)) { - $payload = $callable; - $type = ParallelTask::TYPE_SIMPLE; - } elseif ($callable instanceof \Closure) { - if ($errorHandler === null) { - $errorHandler = function ($errno, $errstr) { - if ($errno & \error_reporting()) { - throw new \Error($errstr); - } - }; - } - - // Set custom error handler because Serializer only issues a notice if serialization fails. - \set_error_handler($errorHandler); - - try { - $payload = $serializer->serialize($callable); - $type = ParallelTask::TYPE_CLOSURE; - } finally { - \restore_error_handler(); - } - } else { - throw new \Error('Unsupported callable type: ' . \gettype($callable)); - } - - return function (...$args) use ($type, $payload): Promise { - return enqueue(new ParallelTask($type, $payload, $args)); + return function (...$args) use ($payload): Promise { + return enqueue(new ParallelTask($payload, $args)); }; } @@ -62,7 +43,7 @@ function parallel(callable $callable): callable { * @param callable $callable * * @return Promise Resolves to the result once the operation finished. - * @throws \Error + * @throws \Error If the passed callable is not safely serializable. */ function parallelMap(array $array, callable $callable): Promise { return call(function () use ($array, $callable) { @@ -86,6 +67,7 @@ function parallelMap(array $array, callable $callable): Promise { * @param int $flag * * @return Promise + * @throws \Error If the passed callable is not safely serializable. */ function parallelFilter(array $array, callable $callable = null, int $flag = 0): Promise { return call(function () use ($array, $callable, $flag) { diff --git a/test/ParallelTest.php b/test/ParallelTest.php index 00d6334..390c1e2 100644 --- a/test/ParallelTest.php +++ b/test/ParallelTest.php @@ -8,12 +8,12 @@ use function Amp\ParallelFunctions\parallel; class ParallelTest extends TestCase { /** * @expectedException \Error - * @expectedExceptionMessage Serialization of closure failed + * @expectedExceptionMessage Unsupported callable: Serialization of 'class@anonymous' is not allowed */ public function testUnserializableClosure() { $unserializable = new class { }; - $callable = parallel(function () use ($unserializable) { + parallel(function () use ($unserializable) { return 1; }); }