Add Pool parameter to function

Allows specifying a specific worker pool instead of the global pool.
This commit is contained in:
Aaron Piotrowski 2017-12-14 20:26:03 -06:00
parent d57bfecbf5
commit 6ff9ee6c4c
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 38 additions and 17 deletions

View File

@ -3,6 +3,7 @@
namespace Amp\ParallelFunctions; namespace Amp\ParallelFunctions;
use Amp\MultiReasonException; use Amp\MultiReasonException;
use Amp\Parallel\Worker\Pool;
use Amp\ParallelFunctions\Internal\ParallelTask; use Amp\ParallelFunctions\Internal\ParallelTask;
use Amp\Promise; use Amp\Promise;
use Opis\Closure\SerializableClosure; use Opis\Closure\SerializableClosure;
@ -13,12 +14,13 @@ use function Amp\Promise\any;
/** /**
* Parallelizes a callable. * Parallelizes a callable.
* *
* @param callable $callable Callable to parallelize. * @param callable $callable Callable to parallelize.
* @param Pool|null $pool Worker pool instance to use or null to use the global pool.
* *
* @return callable Callable executing in another thread / process. * @return callable Callable executing in another thread / process.
* @throws \Error If the passed callable is not safely serializable. * @throws \Error If the passed callable is not safely serializable.
*/ */
function parallel(callable $callable): callable { function parallel(callable $callable, Pool $pool = null): callable {
try { try {
if (\is_string($callable)) { if (\is_string($callable)) {
$payload = \serialize($callable); $payload = \serialize($callable);
@ -31,25 +33,27 @@ function parallel(callable $callable): callable {
throw new \Error('Unsupported callable: ' . $e->getMessage()); throw new \Error('Unsupported callable: ' . $e->getMessage());
} }
return function (...$args) use ($payload): Promise { return function (...$args) use ($pool, $payload): Promise {
return enqueue(new ParallelTask($payload, $args)); $task = new ParallelTask($payload, $args);
return $pool ? $pool->enqueue($task) : enqueue($task);
}; };
} }
/** /**
* Parallel version of array_map, but with an argument order consistent with the filter function. * Parallel version of array_map, but with an argument order consistent with the filter function.
* *
* @param array $array * @param array $array
* @param callable $callable * @param callable $callable
* @param Pool|null $pool Worker pool instance to use or null to use the global pool.
* *
* @return Promise Resolves to the result once the operation finished. * @return Promise Resolves to the result once the operation finished.
* @throws \Error If the passed callable is not safely serializable. * @throws \Error If the passed callable is not safely serializable.
*/ */
function parallelMap(array $array, callable $callable): Promise { function parallelMap(array $array, callable $callable, Pool $pool = null): Promise {
return call(function () use ($array, $callable) { return call(function () use ($array, $callable, $pool) {
// Amp\Promise\any() guarantees that all operations finished prior to resolving. Amp\Promise\all() doesn't. // Amp\Promise\any() guarantees that all operations finished prior to resolving. Amp\Promise\all() doesn't.
// Additionally, we return all errors as a MultiReasonException instead of throwing on the first error. // Additionally, we return all errors as a MultiReasonException instead of throwing on the first error.
list($errors, $results) = yield any(\array_map(parallel($callable), $array)); list($errors, $results) = yield any(\array_map(parallel($callable, $pool), $array));
if ($errors) { if ($errors) {
throw new MultiReasonException($errors); throw new MultiReasonException($errors);
@ -62,15 +66,16 @@ function parallelMap(array $array, callable $callable): Promise {
/** /**
* Parallel version of array_filter. * Parallel version of array_filter.
* *
* @param array $array * @param array $array
* @param callable $callable * @param callable $callable
* @param int $flag * @param int $flag
* @param Pool|null $pool Worker pool instance to use or null to use the global pool.
* *
* @return Promise * @return Promise
* @throws \Error If the passed callable is not safely serializable. * @throws \Error If the passed callable is not safely serializable.
*/ */
function parallelFilter(array $array, callable $callable = null, int $flag = 0): Promise { function parallelFilter(array $array, callable $callable = null, int $flag = 0, Pool $pool = null): Promise {
return call(function () use ($array, $callable, $flag) { return call(function () use ($array, $callable, $flag, $pool) {
if ($callable === null) { if ($callable === null) {
if ($flag === \ARRAY_FILTER_USE_BOTH || $flag === \ARRAY_FILTER_USE_KEY) { if ($flag === \ARRAY_FILTER_USE_BOTH || $flag === \ARRAY_FILTER_USE_KEY) {
throw new \Error('A valid $callable must be provided if $flag is set.'); throw new \Error('A valid $callable must be provided if $flag is set.');
@ -84,11 +89,11 @@ function parallelFilter(array $array, callable $callable = null, int $flag = 0):
// Amp\Promise\any() guarantees that all operations finished prior to resolving. Amp\Promise\all() doesn't. // Amp\Promise\any() guarantees that all operations finished prior to resolving. Amp\Promise\all() doesn't.
// Additionally, we return all errors as a MultiReasonException instead of throwing on the first error. // Additionally, we return all errors as a MultiReasonException instead of throwing on the first error.
if ($flag === \ARRAY_FILTER_USE_BOTH) { if ($flag === \ARRAY_FILTER_USE_BOTH) {
list($errors, $results) = yield any(\array_map(parallel($callable), $array, \array_keys($array))); list($errors, $results) = yield any(\array_map(parallel($callable, $pool), $array, \array_keys($array)));
} elseif ($flag === \ARRAY_FILTER_USE_KEY) { } elseif ($flag === \ARRAY_FILTER_USE_KEY) {
list($errors, $results) = yield any(\array_map(parallel($callable), \array_keys($array))); list($errors, $results) = yield any(\array_map(parallel($callable, $pool), \array_keys($array)));
} else { } else {
list($errors, $results) = yield any(\array_map(parallel($callable), $array)); list($errors, $results) = yield any(\array_map(parallel($callable, $pool), $array));
} }
if ($errors) { if ($errors) {

View File

@ -2,7 +2,10 @@
namespace Amp\ParallelFunctions\Test; namespace Amp\ParallelFunctions\Test;
use Amp\Parallel\Worker\Pool;
use Amp\PHPUnit\TestCase; use Amp\PHPUnit\TestCase;
use Amp\Promise;
use Amp\Success;
use function Amp\ParallelFunctions\parallel; use function Amp\ParallelFunctions\parallel;
class ParallelTest extends TestCase { class ParallelTest extends TestCase {
@ -17,4 +20,17 @@ class ParallelTest extends TestCase {
return 1; return 1;
}); });
} }
public function testCustomPool() {
$mock = $this->createMock(Pool::class);
$mock->expects($this->once())
->method("enqueue")
->willReturn(new Success(1));
$callable = parallel(function () {
return 0;
}, $mock);
$this->assertSame(1, Promise\wait($callable()));
}
} }