mirror of
https://github.com/danog/parallel-functions.git
synced 2024-12-02 09:37:56 +01:00
Serialize callable immediately
This largely reverts d304ce7a54
. Too bad, I was hoping to reuse CallableTask.
This commit is contained in:
parent
d304ce7a54
commit
6d033dbd67
38
src/Internal/SerializedCallableTask.php
Normal file
38
src/Internal/SerializedCallableTask.php
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Amp\ParallelFunctions\Internal;
|
||||||
|
|
||||||
|
use Amp\Parallel\Worker\Environment;
|
||||||
|
use Amp\Parallel\Worker\Task;
|
||||||
|
|
||||||
|
/** @internal */
|
||||||
|
class SerializedCallableTask implements Task {
|
||||||
|
/** @var string */
|
||||||
|
private $function;
|
||||||
|
|
||||||
|
/** @var mixed[] */
|
||||||
|
private $args;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $function Serialized function.
|
||||||
|
* @param array $args Arguments to pass to the function. Must be serializable.
|
||||||
|
*/
|
||||||
|
public function __construct(string $function, array $args) {
|
||||||
|
$this->function = $function;
|
||||||
|
$this->args = $args;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function run(Environment $environment) {
|
||||||
|
$callable = \unserialize($this->function, ['allowed_classes' => true]);
|
||||||
|
|
||||||
|
if ($callable instanceof \__PHP_Incomplete_Class) {
|
||||||
|
throw new \Error('When using a class instance as a callable, the class must be autoloadable');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (\is_array($callable) && $callable[0] instanceof \__PHP_Incomplete_Class) {
|
||||||
|
throw new \Error('When using a class instance method as a callable, the class must be autoloadable');
|
||||||
|
}
|
||||||
|
|
||||||
|
return $callable(...$this->args);
|
||||||
|
}
|
||||||
|
}
|
@ -3,7 +3,7 @@
|
|||||||
namespace Amp\ParallelFunctions;
|
namespace Amp\ParallelFunctions;
|
||||||
|
|
||||||
use Amp\MultiReasonException;
|
use Amp\MultiReasonException;
|
||||||
use Amp\Parallel\Worker\CallableTask;
|
use Amp\Parallel\Sync\SerializationException;
|
||||||
use Amp\Parallel\Worker\Pool;
|
use Amp\Parallel\Worker\Pool;
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Opis\Closure\SerializableClosure;
|
use Opis\Closure\SerializableClosure;
|
||||||
@ -18,15 +18,21 @@ use function Amp\Promise\any;
|
|||||||
* @param Pool|null $pool Worker pool instance to use or null to use the global pool.
|
* @param Pool|null $pool Worker pool instance to use or null to use the global pool.
|
||||||
*
|
*
|
||||||
* @return callable Callable executing in another thread / process.
|
* @return callable Callable executing in another thread / process.
|
||||||
* @throws \Error If the passed callable is not safely serializable.
|
* @throws SerializationException If the passed callable is not safely serializable.
|
||||||
*/
|
*/
|
||||||
function parallel(callable $callable, Pool $pool = null): callable {
|
function parallel(callable $callable, Pool $pool = null): callable {
|
||||||
if ($callable instanceof \Closure) {
|
if ($callable instanceof \Closure) {
|
||||||
$callable = new SerializableClosure($callable);
|
$callable = new SerializableClosure($callable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$callable = \serialize($callable);
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
throw new SerializationException("Unsupported callable: " . $e->getMessage(), 0, $e);
|
||||||
|
}
|
||||||
|
|
||||||
return function (...$args) use ($pool, $callable): Promise {
|
return function (...$args) use ($pool, $callable): Promise {
|
||||||
$task = new CallableTask($callable, $args);
|
$task = new Internal\SerializedCallableTask($callable, $args);
|
||||||
return $pool ? $pool->enqueue($task) : enqueue($task);
|
return $pool ? $pool->enqueue($task) : enqueue($task);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ class UnserializableClass {
|
|||||||
class ParallelTest extends TestCase {
|
class ParallelTest extends TestCase {
|
||||||
public function testUnserializableClosure() {
|
public function testUnserializableClosure() {
|
||||||
$this->expectException(SerializationException::class);
|
$this->expectException(SerializationException::class);
|
||||||
$this->expectExceptionMessage('The given data cannot be sent because it is not serializable');
|
$this->expectExceptionMessage("Unsupported callable: Serialization of 'class@anonymous' is not allowed");
|
||||||
|
|
||||||
$unserializable = new class {
|
$unserializable = new class {
|
||||||
};
|
};
|
||||||
@ -74,7 +74,7 @@ class ParallelTest extends TestCase {
|
|||||||
|
|
||||||
public function testUnserializableCallable() {
|
public function testUnserializableCallable() {
|
||||||
$this->expectException(SerializationException::class);
|
$this->expectException(SerializationException::class);
|
||||||
$this->expectExceptionMessage("The given data cannot be sent because it is not serializable");
|
$this->expectExceptionMessage("Unsupported callable: Serialization of 'class@anonymous' is not allowed");
|
||||||
|
|
||||||
$callable = new class {
|
$callable = new class {
|
||||||
public function __invoke() {
|
public function __invoke() {
|
||||||
|
Loading…
Reference in New Issue
Block a user