Use CallableTask from amphp/parallel v1.1

This commit is contained in:
Aaron Piotrowski 2019-01-07 18:14:31 -06:00
parent 12e6c602e0
commit d304ce7a54
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
5 changed files with 16 additions and 59 deletions

View File

@ -24,7 +24,7 @@
}, },
"require": { "require": {
"php": ">=7", "php": ">=7",
"amphp/parallel": "^0.1.8 || ^0.2 || ^1", "amphp/parallel": "^1.1",
"amphp/amp": "^2.0.3", "amphp/amp": "^2.0.3",
"opis/closure": "^3.0.7" "opis/closure": "^3.0.7"
}, },

View File

@ -5,6 +5,6 @@ require __DIR__ . '/../vendor/autoload.php';
use function Amp\ParallelFunctions\parallelMap; use function Amp\ParallelFunctions\parallelMap;
use function Amp\Promise\wait; use function Amp\Promise\wait;
// All output in the parallel environment is redirected to STDERR of the parent process automatically. // All output in the parallel environment is redirected to STDOUT/STDERR of the parent process automatically.
// You might notice that the output order varies here when running it multiple times. // You might notice that the output order varies here when running it multiple times.
wait(parallelMap([1, 2, 3], 'var_dump')); wait(parallelMap([1, 2, 3], 'var_dump'));

View File

@ -1,38 +0,0 @@
<?php
namespace Amp\ParallelFunctions\Internal;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
/** @internal */
class ParallelTask implements Task {
/** @var string */
private $function;
/** @var mixed[] */
private $args;
/**
* @param string $function Serialized function.
* @param array $args Arguments to pass to the function. Must be serializable.
*/
public function __construct(string $function, array $args) {
$this->function = $function;
$this->args = $args;
}
public function run(Environment $environment) {
$callable = \unserialize($this->function, ['allowed_classes' => true]);
if ($callable instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance as a callable, the class must be autoloadable');
}
if (\is_array($callable) && $callable[0] instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance method as a callable, the class must be autoloadable');
}
return $callable(...$this->args);
}
}

View File

@ -3,6 +3,7 @@
namespace Amp\ParallelFunctions; namespace Amp\ParallelFunctions;
use Amp\MultiReasonException; use Amp\MultiReasonException;
use Amp\Parallel\Worker\CallableTask;
use Amp\Parallel\Worker\Pool; use Amp\Parallel\Worker\Pool;
use Amp\Promise; use Amp\Promise;
use Opis\Closure\SerializableClosure; use Opis\Closure\SerializableClosure;
@ -20,18 +21,12 @@ use function Amp\Promise\any;
* @throws \Error If the passed callable is not safely serializable. * @throws \Error If the passed callable is not safely serializable.
*/ */
function parallel(callable $callable, Pool $pool = null): callable { function parallel(callable $callable, Pool $pool = null): callable {
try {
if ($callable instanceof \Closure) { if ($callable instanceof \Closure) {
$callable = new SerializableClosure($callable); $callable = new SerializableClosure($callable);
} }
$payload = \serialize($callable); return function (...$args) use ($pool, $callable): Promise {
} catch (\Exception $e) { $task = new CallableTask($callable, $args);
throw new \Error('Unsupported callable: ' . $e->getMessage(), 0, $e);
}
return function (...$args) use ($pool, $payload): Promise {
$task = new Internal\ParallelTask($payload, $args);
return $pool ? $pool->enqueue($task) : enqueue($task); return $pool ? $pool->enqueue($task) : enqueue($task);
}; };
} }

View File

@ -2,6 +2,7 @@
namespace Amp\ParallelFunctions\Test; namespace Amp\ParallelFunctions\Test;
use Amp\Parallel\Sync\SerializationException;
use Amp\Parallel\Worker\Pool; use Amp\Parallel\Worker\Pool;
use Amp\ParallelFunctions\Test\Fixture\TestCallables; use Amp\ParallelFunctions\Test\Fixture\TestCallables;
use Amp\PHPUnit\TestCase; use Amp\PHPUnit\TestCase;
@ -21,16 +22,15 @@ class UnserializableClass {
} }
class ParallelTest extends TestCase { class ParallelTest extends TestCase {
/**
* @expectedException \Error
* @expectedExceptionMessage Unsupported callable: Serialization of 'class@anonymous' is not allowed
*/
public function testUnserializableClosure() { public function testUnserializableClosure() {
$this->expectException(SerializationException::class);
$this->expectExceptionMessage('The given data cannot be sent because it is not serializable');
$unserializable = new class { $unserializable = new class {
}; };
parallel(function () use ($unserializable) { Promise\wait(parallel(function () use ($unserializable) {
return 1; return 1;
}); })());
} }
public function testCustomPool() { public function testCustomPool() {
@ -73,15 +73,15 @@ class ParallelTest extends TestCase {
} }
public function testUnserializableCallable() { public function testUnserializableCallable() {
$this->expectException(\Error::class); $this->expectException(SerializationException::class);
$this->expectExceptionMessage("Unsupported callable: Serialization of 'class@anonymous' is not allowed"); $this->expectExceptionMessage("The given data cannot be sent because it is not serializable");
$callable = new class { $callable = new class {
public function __invoke() { public function __invoke() {
} }
}; };
$callable = parallel($callable); Promise\wait(parallel($callable)());
} }
public function testUnserializableClassInstance() { public function testUnserializableClassInstance() {