1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Add CallableTask and enqueueCallable

This commit is contained in:
Niklas Keller 2018-11-01 09:36:21 +01:00 committed by Aaron Piotrowski
parent 4c3c93e46a
commit 912047f2ec
7 changed files with 121 additions and 54 deletions

View File

@ -1,42 +0,0 @@
<?php
namespace Amp\Parallel\Example;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
class BlockingTask implements Task
{
/**
* @var callable
*/
private $function;
/**
* @var mixed[]
*/
private $args;
/**
* @param callable $function Do not use a closure or non-serializable object.
* @param mixed ...$args Arguments to pass to the function. Must be serializable.
*/
public function __construct(callable $function, ...$args)
{
$this->function = $function;
$this->args = $args;
}
/**
* {@inheritdoc}
*/
public function run(Environment $environment)
{
return ($this->function)(...$this->args);
}
public function getArgs()
{
return $this->args;
}
}

View File

@ -0,0 +1,24 @@
#!/usr/bin/env php
<?php
require \dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Parallel\Worker;
use Amp\Promise;
$urls = [
'https://secure.php.net',
'https://amphp.org',
'https://github.com',
];
$promises = [];
foreach ($urls as $url) {
$promises[$url] = Worker\enqueueCallable('file_get_contents', $url);
}
$responses = Promise\wait(Promise\all($promises));
foreach ($responses as $url => $response) {
\printf("Read %d bytes from %s\n", \strlen($response), $url);
}

View File

@ -1,9 +1,10 @@
#!/usr/bin/env php
<?php
require \dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Loop;
use Amp\Parallel\Example\BlockingTask;
use Amp\Parallel\Worker\CallableTask;
use Amp\Parallel\Worker\DefaultPool;
// A variable to store our fetched results
@ -11,9 +12,9 @@ $results = [];
// We can first define tasks and then run them
$tasks = [
new BlockingTask('file_get_contents', 'http://php.net'),
new BlockingTask('file_get_contents', 'https://amphp.org'),
new BlockingTask('file_get_contents', 'https://github.com'),
new CallableTask('file_get_contents', 'http://php.net'),
new CallableTask('file_get_contents', 'https://amphp.org'),
new CallableTask('file_get_contents', 'https://github.com'),
];
// Event loop for parallel tasks

View File

@ -1,8 +1,8 @@
#!/usr/bin/env php
<?php
require \dirname(__DIR__).'/vendor/autoload.php';
require \dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Parallel\Example\BlockingTask;
use Amp\Parallel\Worker\CallableTask;
use Amp\Parallel\Worker\DefaultWorkerFactory;
Amp\Loop::run(function () {
@ -10,7 +10,7 @@ Amp\Loop::run(function () {
$worker = $factory->create();
$result = yield $worker->enqueue(new BlockingTask('file_get_contents', 'https://google.com'));
$result = yield $worker->enqueue(new CallableTask('file_get_contents', 'https://google.com'));
\printf("Read %d bytes\n", \strlen($result));
$code = yield $worker->shutdown();

View File

@ -0,0 +1,38 @@
<?php
namespace Amp\Parallel\Worker;
/**
* Task implementation dispatching a simple callable.
*/
final class CallableTask implements Task
{
/** @var string */
private $callable;
/** @var mixed[] */
private $args;
/**
* @param callable $callable Callable will be serialized.
* @param mixed $args Arguments to pass to the function. Must be serializable.
*/
public function __construct(callable $callable, array $args)
{
$this->callable = $callable;
$this->args = $args;
}
public function run(Environment $environment)
{
if ($this->callable instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance as a callable, the class must be autoloadable');
}
if (\is_array($this->callable) && ($this->callable[0] ?? null) instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance method as a callable, the class must be autoloadable');
}
return ($this->callable)(...$this->args);
}
}

View File

@ -33,15 +33,28 @@ function pool(Pool $pool = null): Pool
/**
* Enqueues a task to be executed by the global worker pool.
*
* @param \Amp\Parallel\Worker\Task $task The task to enqueue.
* @param Task $task The task to enqueue.
*
* @return \Amp\Promise<mixed>
* @return Promise<mixed>
*/
function enqueue(Task $task): Promise
{
return pool()->enqueue($task);
}
/**
* Enqueues a callable to be executed by the global worker pool.
*
* @param callable $callable Callable needs to be serializable.
* @param mixed ...$args Arguments have to be serializable.
*
* @return Promise<mixed>
*/
function enqueueCallable(callable $callable, ...$args)
{
return enqueue(new CallableTask($callable, $args));
}
/**
* Gets a worker from the global worker pool.
*

View File

@ -44,6 +44,40 @@ class FunctionsTest extends TestCase
$this->assertSame($value, Promise\wait($awaitable));
}
/**
* @depends testPool
*/
public function testEnqueueCallable()
{
$pool = $this->createMock(Pool::class);
$pool->method('enqueue')
->will($this->returnCallback(function (Task $task): Promise {
return new Success($task->run($this->createMock(Environment::class)));
}));
Worker\pool($pool);
$value = 42;
$promise = Worker\enqueueCallable('strval', $value);
$this->assertSame('42', Promise\wait($promise));
}
/**
* @depends testEnqueueCallable
*/
public function testEnqueueCallableIntegration()
{
Worker\pool(new Worker\DefaultPool());
$value = 42;
$promise = Worker\enqueueCallable('strval', $value);
$this->assertSame('42', Promise\wait($promise));
}
/**
* @depends testPool
*/
@ -79,7 +113,6 @@ class FunctionsTest extends TestCase
->will($this->returnValue($this->createMock(Worker\Worker::class)));
Worker\factory($factory);
$worker = Worker\create();
Worker\create(); // shouldn't throw
}
}