mirror of
https://github.com/danog/parallel-functions.git
synced 2024-11-30 04:39:03 +01:00
Allow any serializable callable
This commit is contained in:
parent
27e3657820
commit
0b63981159
@ -25,6 +25,14 @@ class ParallelTask implements Task {
|
|||||||
public function run(Environment $environment) {
|
public function run(Environment $environment) {
|
||||||
$callable = \unserialize($this->function, ['allowed_classes' => true]);
|
$callable = \unserialize($this->function, ['allowed_classes' => true]);
|
||||||
|
|
||||||
|
if ($callable instanceof \__PHP_Incomplete_Class) {
|
||||||
|
throw new \Error('When using a class instance as a callable, the class must be autoloadable');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (\is_array($callable) && $callable[0] instanceof \__PHP_Incomplete_Class) {
|
||||||
|
throw new \Error('When using a class instance method as a callable, the class must be autoloadable');
|
||||||
|
}
|
||||||
|
|
||||||
return $callable(...$this->args);
|
return $callable(...$this->args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,6 @@ namespace Amp\ParallelFunctions;
|
|||||||
|
|
||||||
use Amp\MultiReasonException;
|
use Amp\MultiReasonException;
|
||||||
use Amp\Parallel\Worker\Pool;
|
use Amp\Parallel\Worker\Pool;
|
||||||
use Amp\ParallelFunctions\Internal\ParallelTask;
|
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Opis\Closure\SerializableClosure;
|
use Opis\Closure\SerializableClosure;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
@ -22,19 +21,17 @@ use function Amp\Promise\any;
|
|||||||
*/
|
*/
|
||||||
function parallel(callable $callable, Pool $pool = null): callable {
|
function parallel(callable $callable, Pool $pool = null): callable {
|
||||||
try {
|
try {
|
||||||
if (\is_string($callable)) {
|
if ($callable instanceof \Closure) {
|
||||||
$payload = \serialize($callable);
|
$callable = new SerializableClosure($callable);
|
||||||
} elseif ($callable instanceof \Closure) {
|
|
||||||
$payload = \serialize(new SerializableClosure($callable));
|
|
||||||
} else {
|
|
||||||
throw new \Error('Unsupported callable type: ' . \gettype($callable));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$payload = \serialize($callable);
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
throw new \Error('Unsupported callable: ' . $e->getMessage());
|
throw new \Error('Unsupported callable: ' . $e->getMessage(), 0, $e);
|
||||||
}
|
}
|
||||||
|
|
||||||
return function (...$args) use ($pool, $payload): Promise {
|
return function (...$args) use ($pool, $payload): Promise {
|
||||||
$task = new ParallelTask($payload, $args);
|
$task = new Internal\ParallelTask($payload, $args);
|
||||||
return $pool ? $pool->enqueue($task) : enqueue($task);
|
return $pool ? $pool->enqueue($task) : enqueue($task);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
17
test/Fixture/TestCallables.php
Normal file
17
test/Fixture/TestCallables.php
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Amp\ParallelFunctions\Test\Fixture;
|
||||||
|
|
||||||
|
class TestCallables {
|
||||||
|
public static function staticMethod(int $value): int {
|
||||||
|
return $value + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function instanceMethod(int $value): int {
|
||||||
|
return $value + 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function __invoke(int $value) {
|
||||||
|
return $value + 3;
|
||||||
|
}
|
||||||
|
}
|
@ -3,11 +3,23 @@
|
|||||||
namespace Amp\ParallelFunctions\Test;
|
namespace Amp\ParallelFunctions\Test;
|
||||||
|
|
||||||
use Amp\Parallel\Worker\Pool;
|
use Amp\Parallel\Worker\Pool;
|
||||||
|
use Amp\ParallelFunctions\Test\Fixture\TestCallables;
|
||||||
use Amp\PHPUnit\TestCase;
|
use Amp\PHPUnit\TestCase;
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Success;
|
use Amp\Success;
|
||||||
use function Amp\ParallelFunctions\parallel;
|
use function Amp\ParallelFunctions\parallel;
|
||||||
|
|
||||||
|
class UnserializableClass {
|
||||||
|
public function __invoke() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public function instanceMethod() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static function staticMethod() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class ParallelTest extends TestCase {
|
class ParallelTest extends TestCase {
|
||||||
/**
|
/**
|
||||||
* @expectedException \Error
|
* @expectedException \Error
|
||||||
@ -33,4 +45,75 @@ class ParallelTest extends TestCase {
|
|||||||
|
|
||||||
$this->assertSame(1, Promise\wait($callable()));
|
$this->assertSame(1, Promise\wait($callable()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function testClassStaticMethod() {
|
||||||
|
$callable = [TestCallables::class, 'staticMethod'];
|
||||||
|
$result = $callable(1);
|
||||||
|
$callable = parallel($callable);
|
||||||
|
|
||||||
|
$this->assertSame($result, Promise\wait($callable(1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testClassInstanceMethod() {
|
||||||
|
$instance = new TestCallables;
|
||||||
|
|
||||||
|
$callable = [$instance, 'instanceMethod'];
|
||||||
|
$result = $callable(1);
|
||||||
|
$callable = parallel($callable);
|
||||||
|
|
||||||
|
$this->assertSame($result, Promise\wait($callable(1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testCallableClass() {
|
||||||
|
$callable = new TestCallables;
|
||||||
|
$result = $callable(1);
|
||||||
|
$callable = parallel($callable);
|
||||||
|
|
||||||
|
$this->assertSame($result, Promise\wait($callable(1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testUnserializableCallable() {
|
||||||
|
$this->expectException(\Error::class);
|
||||||
|
$this->expectExceptionMessage("Unsupported callable: Serialization of 'class@anonymous' is not allowed");
|
||||||
|
|
||||||
|
$callable = new class {
|
||||||
|
public function __invoke() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
$callable = parallel($callable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testUnserializableClassInstance() {
|
||||||
|
$this->expectException(\Error::class);
|
||||||
|
$this->expectExceptionMessage('Uncaught Error in worker with message "When using a class instance as a callable, the class must be autoloadable"');
|
||||||
|
|
||||||
|
$callable = new UnserializableClass;
|
||||||
|
|
||||||
|
$callable = parallel($callable);
|
||||||
|
|
||||||
|
Promise\wait($callable());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testUnserializableClassInstanceMethod() {
|
||||||
|
$this->expectException(\Error::class);
|
||||||
|
$this->expectExceptionMessage('Uncaught Error in worker with message "When using a class instance method as a callable, the class must be autoloadable"');
|
||||||
|
|
||||||
|
$callable = [new UnserializableClass, 'instanceMethod'];
|
||||||
|
|
||||||
|
$callable = parallel($callable);
|
||||||
|
|
||||||
|
Promise\wait($callable());
|
||||||
|
}
|
||||||
|
|
||||||
|
public function testUnserializableClassStaticMethod() {
|
||||||
|
$this->expectException(\Error::class);
|
||||||
|
$this->expectExceptionMessage('Uncaught Error in worker with message "Class \'Amp\\ParallelFunctions\\Test\\UnserializableClass\' not found"');
|
||||||
|
|
||||||
|
$callable = [UnserializableClass::class, 'staticMethod'];
|
||||||
|
|
||||||
|
$callable = parallel($callable);
|
||||||
|
|
||||||
|
Promise\wait($callable());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user