From 6ff9ee6c4c93084a4d273e6f5ed2461dbac818b1 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 14 Dec 2017 20:26:03 -0600 Subject: [PATCH] Add Pool parameter to function Allows specifying a specific worker pool instead of the global pool. --- src/functions.php | 39 ++++++++++++++++++++++----------------- test/ParallelTest.php | 16 ++++++++++++++++ 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/functions.php b/src/functions.php index 53fd862..77341aa 100644 --- a/src/functions.php +++ b/src/functions.php @@ -3,6 +3,7 @@ namespace Amp\ParallelFunctions; use Amp\MultiReasonException; +use Amp\Parallel\Worker\Pool; use Amp\ParallelFunctions\Internal\ParallelTask; use Amp\Promise; use Opis\Closure\SerializableClosure; @@ -13,12 +14,13 @@ use function Amp\Promise\any; /** * Parallelizes a callable. * - * @param callable $callable Callable to parallelize. + * @param callable $callable Callable to parallelize. + * @param Pool|null $pool Worker pool instance to use or null to use the global pool. * * @return callable Callable executing in another thread / process. * @throws \Error If the passed callable is not safely serializable. */ -function parallel(callable $callable): callable { +function parallel(callable $callable, Pool $pool = null): callable { try { if (\is_string($callable)) { $payload = \serialize($callable); @@ -31,25 +33,27 @@ function parallel(callable $callable): callable { throw new \Error('Unsupported callable: ' . $e->getMessage()); } - return function (...$args) use ($payload): Promise { - return enqueue(new ParallelTask($payload, $args)); + return function (...$args) use ($pool, $payload): Promise { + $task = new ParallelTask($payload, $args); + return $pool ? $pool->enqueue($task) : enqueue($task); }; } /** * Parallel version of array_map, but with an argument order consistent with the filter function. * - * @param array $array - * @param callable $callable + * @param array $array + * @param callable $callable + * @param Pool|null $pool Worker pool instance to use or null to use the global pool. * * @return Promise Resolves to the result once the operation finished. * @throws \Error If the passed callable is not safely serializable. */ -function parallelMap(array $array, callable $callable): Promise { - return call(function () use ($array, $callable) { +function parallelMap(array $array, callable $callable, Pool $pool = null): Promise { + return call(function () use ($array, $callable, $pool) { // Amp\Promise\any() guarantees that all operations finished prior to resolving. Amp\Promise\all() doesn't. // Additionally, we return all errors as a MultiReasonException instead of throwing on the first error. - list($errors, $results) = yield any(\array_map(parallel($callable), $array)); + list($errors, $results) = yield any(\array_map(parallel($callable, $pool), $array)); if ($errors) { throw new MultiReasonException($errors); @@ -62,15 +66,16 @@ function parallelMap(array $array, callable $callable): Promise { /** * Parallel version of array_filter. * - * @param array $array - * @param callable $callable - * @param int $flag + * @param array $array + * @param callable $callable + * @param int $flag + * @param Pool|null $pool Worker pool instance to use or null to use the global pool. * * @return Promise * @throws \Error If the passed callable is not safely serializable. */ -function parallelFilter(array $array, callable $callable = null, int $flag = 0): Promise { - return call(function () use ($array, $callable, $flag) { +function parallelFilter(array $array, callable $callable = null, int $flag = 0, Pool $pool = null): Promise { + return call(function () use ($array, $callable, $flag, $pool) { if ($callable === null) { if ($flag === \ARRAY_FILTER_USE_BOTH || $flag === \ARRAY_FILTER_USE_KEY) { throw new \Error('A valid $callable must be provided if $flag is set.'); @@ -84,11 +89,11 @@ function parallelFilter(array $array, callable $callable = null, int $flag = 0): // Amp\Promise\any() guarantees that all operations finished prior to resolving. Amp\Promise\all() doesn't. // Additionally, we return all errors as a MultiReasonException instead of throwing on the first error. if ($flag === \ARRAY_FILTER_USE_BOTH) { - list($errors, $results) = yield any(\array_map(parallel($callable), $array, \array_keys($array))); + list($errors, $results) = yield any(\array_map(parallel($callable, $pool), $array, \array_keys($array))); } elseif ($flag === \ARRAY_FILTER_USE_KEY) { - list($errors, $results) = yield any(\array_map(parallel($callable), \array_keys($array))); + list($errors, $results) = yield any(\array_map(parallel($callable, $pool), \array_keys($array))); } else { - list($errors, $results) = yield any(\array_map(parallel($callable), $array)); + list($errors, $results) = yield any(\array_map(parallel($callable, $pool), $array)); } if ($errors) { diff --git a/test/ParallelTest.php b/test/ParallelTest.php index 390c1e2..82013e9 100644 --- a/test/ParallelTest.php +++ b/test/ParallelTest.php @@ -2,7 +2,10 @@ namespace Amp\ParallelFunctions\Test; +use Amp\Parallel\Worker\Pool; use Amp\PHPUnit\TestCase; +use Amp\Promise; +use Amp\Success; use function Amp\ParallelFunctions\parallel; class ParallelTest extends TestCase { @@ -17,4 +20,17 @@ class ParallelTest extends TestCase { return 1; }); } + + public function testCustomPool() { + $mock = $this->createMock(Pool::class); + $mock->expects($this->once()) + ->method("enqueue") + ->willReturn(new Success(1)); + + $callable = parallel(function () { + return 0; + }, $mock); + + $this->assertSame(1, Promise\wait($callable())); + } }