mirror of
https://github.com/danog/amp.git
synced 2025-01-22 05:11:42 +01:00
Add Combinator class
This commit is contained in:
parent
6b7c24e160
commit
969d15ebe8
229
lib/Combinator.php
Normal file
229
lib/Combinator.php
Normal file
@ -0,0 +1,229 @@
|
||||
<?php
|
||||
|
||||
namespace Amp;
|
||||
|
||||
class Combinator {
|
||||
private $reactor;
|
||||
|
||||
/**
|
||||
* @param \Amp\Reactor $reactor
|
||||
*/
|
||||
public function __construct(Reactor $reactor = null) {
|
||||
$this->reactor = $reactor ?: reactor();
|
||||
}
|
||||
|
||||
/**
|
||||
* If any one of the Promises fails the resulting Promise will fail. Otherwise
|
||||
* the resulting Promise succeeds with an array matching keys from the input array
|
||||
* to their resolved values.
|
||||
*
|
||||
* @param array[\Amp\Promise] $promises
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
public function all(array $promises) {
|
||||
if (empty($promises)) {
|
||||
return new Success([]);
|
||||
}
|
||||
|
||||
$results = [];
|
||||
$count = count($promises);
|
||||
$future = new Future($this->reactor);
|
||||
$done = false;
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, &$done) {
|
||||
if ($done) {
|
||||
// If the future already failed we don't bother.
|
||||
return;
|
||||
}
|
||||
if ($error) {
|
||||
$done = true;
|
||||
$future->fail($error);
|
||||
return;
|
||||
}
|
||||
|
||||
$results[$key] = $result;
|
||||
if (--$count === 0) {
|
||||
$done = true;
|
||||
$future->succeed($results);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves with a two-item array delineating successful and failed Promise results.
|
||||
*
|
||||
* The resulting Promise will only fail if ALL of the Promise values fail or if the
|
||||
* Promise array is empty.
|
||||
*
|
||||
* The resulting Promise is resolved with an indexed two-item array of the following form:
|
||||
*
|
||||
* [$arrayOfFailures, $arrayOfSuccesses]
|
||||
*
|
||||
* The individual keys in the resulting arrays are preserved from the initial Promise array
|
||||
* passed to the function for evaluation.
|
||||
*
|
||||
* @param array[\Amp\Promise] $promises
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
public function some(array $promises) {
|
||||
if (empty($promises)) {
|
||||
return new Failure(new \LogicException(
|
||||
'No promises or values provided'
|
||||
));
|
||||
}
|
||||
|
||||
$results = $errors = [];
|
||||
$count = count($promises);
|
||||
$future = new Future($this->reactor);
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, &$errors, $key, $future) {
|
||||
if ($error) {
|
||||
$errors[$key] = $error;
|
||||
} else {
|
||||
$results[$key] = $result;
|
||||
}
|
||||
|
||||
if (--$count > 0) {
|
||||
return;
|
||||
} elseif (empty($results)) {
|
||||
$future->fail(new \RuntimeException(
|
||||
'All promises failed'
|
||||
));
|
||||
} else {
|
||||
$future->succeed([$errors, $results]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves with the first successful Promise value. The resulting Promise will only fail if all
|
||||
* Promise values in the group fail or if the initial Promise array is empty.
|
||||
*
|
||||
* @param array[\Amp\Promise] $promises
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
public function first(array $promises) {
|
||||
if (empty($promises)) {
|
||||
return new Failure(new \LogicException(
|
||||
'No promises or values provided'
|
||||
));
|
||||
}
|
||||
|
||||
$count = count($promises);
|
||||
$done = false;
|
||||
$future = new Future($this->reactor);
|
||||
|
||||
foreach ($promises as $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$done, $future) {
|
||||
if ($done) {
|
||||
// we don't care about Futures that resolve after the first
|
||||
return;
|
||||
} elseif ($error && --$count === 0) {
|
||||
$future->fail(new \RuntimeException(
|
||||
'All promises failed'
|
||||
));
|
||||
} elseif (empty($error)) {
|
||||
$done = true;
|
||||
$this->succeed($result);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Map future values using the specified callable
|
||||
*
|
||||
* @param array $promises
|
||||
* @param callable $func
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
public function map(array $promises, callable $func) {
|
||||
if (empty($promises)) {
|
||||
return new Success([]);
|
||||
}
|
||||
|
||||
$results = [];
|
||||
$count = count($promises);
|
||||
$future = new Future($this->reactor);
|
||||
$done = false;
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, $func, &$done) {
|
||||
if ($done) {
|
||||
// If the future already failed we don't bother.
|
||||
return;
|
||||
}
|
||||
if ($error) {
|
||||
$done = true;
|
||||
$future->fail($error);
|
||||
return;
|
||||
}
|
||||
|
||||
$results[$key] = $func($result);
|
||||
if (--$count === 0) {
|
||||
$future->succeed($results);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter future values using the specified callable
|
||||
*
|
||||
* If the functor returns a truthy value the resolved promise result is retained, otherwise it is
|
||||
* discarded. Array keys are retained for any results not filtered out by the functor.
|
||||
*
|
||||
* @param array $promises
|
||||
* @param callable $func
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
public function filter(array $promises, callable $func) {
|
||||
if (empty($promises)) {
|
||||
return new Success([]);
|
||||
}
|
||||
|
||||
$results = [];
|
||||
$count = count($promises);
|
||||
$future = new Future($this->reactor);
|
||||
$done = false;
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, $func, &$done) {
|
||||
if ($done) {
|
||||
// If the future result already failed we don't bother.
|
||||
return;
|
||||
}
|
||||
if ($error) {
|
||||
$done = true;
|
||||
$future->fail($error);
|
||||
return;
|
||||
}
|
||||
if ($func($result)) {
|
||||
$results[$key] = $result;
|
||||
}
|
||||
if (--$count === 0) {
|
||||
$future->succeed($results);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
}
|
||||
}
|
@ -200,19 +200,6 @@ function stop() {
|
||||
$reactor->stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the global event reactor
|
||||
*
|
||||
* Note that the $factory callable is only invoked if no global reactor has yet been initialized.
|
||||
*
|
||||
* @param callable $factory Optional factory callable for initializing a reactor
|
||||
* @return \Amp\Reactor
|
||||
*/
|
||||
function reactor(callable $factory = null) {
|
||||
static $reactor;
|
||||
return ($reactor = $reactor ?: ReactorFactory::select($factory));
|
||||
}
|
||||
|
||||
/**
|
||||
* React to process control signals
|
||||
*
|
||||
@ -237,6 +224,36 @@ function onSignal($signo, callable $onSignal) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the global event reactor
|
||||
*
|
||||
* Note that the $factory callable is only invoked if no global reactor has yet been initialized.
|
||||
*
|
||||
* @param callable $factory Optional factory callable for initializing a reactor
|
||||
* @return \Amp\Reactor
|
||||
*/
|
||||
function reactor(callable $factory = null) {
|
||||
static $reactor;
|
||||
return ($reactor = $reactor ?: ReactorFactory::select($factory));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a singleton combinator instance
|
||||
*
|
||||
* @param callable $factory
|
||||
* @return \Amp\Combinator
|
||||
*/
|
||||
function combinator(callable $factory = null) {
|
||||
static $combinator;
|
||||
if ($factory) {
|
||||
return $combinator = $factory();
|
||||
} elseif ($combinator) {
|
||||
return $combinator;
|
||||
} else {
|
||||
return $combinator = new Combinator(reactor());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If any one of the Promises fails the resulting Promise will fail. Otherwise
|
||||
* the resulting Promise succeeds with an array matching keys from the input array
|
||||
@ -246,37 +263,7 @@ function onSignal($signo, callable $onSignal) {
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
function all(array $promises) {
|
||||
if (empty($promises)) {
|
||||
return new Success([]);
|
||||
}
|
||||
|
||||
$results = [];
|
||||
$count = count($promises);
|
||||
$future = new Future;
|
||||
$done = false;
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, &$done) {
|
||||
if ($done) {
|
||||
// If the future already failed we don't bother.
|
||||
return;
|
||||
}
|
||||
if ($error) {
|
||||
$done = true;
|
||||
$future->fail($error);
|
||||
return;
|
||||
}
|
||||
|
||||
$results[$key] = $result;
|
||||
if (--$count === 0) {
|
||||
$done = true;
|
||||
$future->succeed($results);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
return combinator()->all($promises);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -296,38 +283,7 @@ function all(array $promises) {
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
function some(array $promises) {
|
||||
if (empty($promises)) {
|
||||
return new Failure(new \LogicException(
|
||||
'No promises or values provided'
|
||||
));
|
||||
}
|
||||
|
||||
$results = $errors = [];
|
||||
$count = count($promises);
|
||||
$future = new Future;
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, &$errors, $key, $future) {
|
||||
if ($error) {
|
||||
$errors[$key] = $error;
|
||||
} else {
|
||||
$results[$key] = $result;
|
||||
}
|
||||
|
||||
if (--$count > 0) {
|
||||
return;
|
||||
} elseif (empty($results)) {
|
||||
$future->fail(new \RuntimeException(
|
||||
'All promises failed'
|
||||
));
|
||||
} else {
|
||||
$future->succeed([$errors, $results]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
return combinator()->some($promises);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -338,34 +294,7 @@ function some(array $promises) {
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
function first(array $promises) {
|
||||
if (empty($promises)) {
|
||||
return new Failure(new \LogicException(
|
||||
'No promises or values provided'
|
||||
));
|
||||
}
|
||||
|
||||
$count = count($promises);
|
||||
$done = false;
|
||||
$future = new Future;
|
||||
|
||||
foreach ($promises as $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$done, $future) {
|
||||
if ($done) {
|
||||
// we don't care about Futures that resolve after the first
|
||||
return;
|
||||
} elseif ($error && --$count === 0) {
|
||||
$future->fail(new \RuntimeException(
|
||||
'All promises failed'
|
||||
));
|
||||
} elseif (empty($error)) {
|
||||
$done = true;
|
||||
$this->succeed($result);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
return combinator()->first($promises);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -376,36 +305,7 @@ function first(array $promises) {
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
function map(array $promises, callable $func) {
|
||||
if (empty($promises)) {
|
||||
return new Success([]);
|
||||
}
|
||||
|
||||
$results = [];
|
||||
$count = count($promises);
|
||||
$future = new Future;
|
||||
$done = false;
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, $func, &$done) {
|
||||
if ($done) {
|
||||
// If the future already failed we don't bother.
|
||||
return;
|
||||
}
|
||||
if ($error) {
|
||||
$done = true;
|
||||
$future->fail($error);
|
||||
return;
|
||||
}
|
||||
|
||||
$results[$key] = $func($result);
|
||||
if (--$count === 0) {
|
||||
$future->succeed($results);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
return combinator()->map($promises);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -419,37 +319,7 @@ function map(array $promises, callable $func) {
|
||||
* @return \Amp\Promise
|
||||
*/
|
||||
function filter(array $promises, callable $func) {
|
||||
if (empty($promises)) {
|
||||
return new Success([]);
|
||||
}
|
||||
|
||||
$results = [];
|
||||
$count = count($promises);
|
||||
$future = new Future;
|
||||
$done = false;
|
||||
|
||||
foreach ($promises as $key => $promise) {
|
||||
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
|
||||
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, $func, &$done) {
|
||||
if ($done) {
|
||||
// If the future result already failed we don't bother.
|
||||
return;
|
||||
}
|
||||
if ($error) {
|
||||
$done = true;
|
||||
$future->fail($error);
|
||||
return;
|
||||
}
|
||||
if ($func($result)) {
|
||||
$results[$key] = $result;
|
||||
}
|
||||
if (--$count === 0) {
|
||||
$future->succeed($results);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return $future->promise();
|
||||
return combinator()->filter($promises);
|
||||
}
|
||||
|
||||
/**
|
||||
|
81
test/CombinatorTest.php
Normal file
81
test/CombinatorTest.php
Normal file
@ -0,0 +1,81 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Test;
|
||||
|
||||
use Amp\Success;
|
||||
use Amp\Failure;
|
||||
use Amp\Combinator;
|
||||
use Amp\NativeReactor;
|
||||
|
||||
class CombinatorTest extends \PHPUnit_Framework_TestCase {
|
||||
|
||||
private function getCombinator() {
|
||||
$reactor = new NativeReactor;
|
||||
return [new Combinator($reactor), $reactor];
|
||||
}
|
||||
|
||||
public function testAllResolvesWithArrayOfResults() {
|
||||
$promises = [
|
||||
'r1' => new Success(42),
|
||||
'r2' => new Success(41),
|
||||
];
|
||||
|
||||
$expected = ['r1' => 42, 'r2' => 41];
|
||||
|
||||
list($combinator) = $this->getCombinator();
|
||||
$results = $combinator->all($promises)->wait();
|
||||
$this->assertSame($expected, $results);
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \RuntimeException
|
||||
* @expectedExceptionMessage zanzibar
|
||||
*/
|
||||
public function testAllThrowsIfAnyIndividualPromiseFails() {
|
||||
$exception = new \RuntimeException('zanzibar');
|
||||
$promises = [
|
||||
'r1' => new Success(42),
|
||||
'r2' => new Failure($exception),
|
||||
'r3' => new Success(40),
|
||||
];
|
||||
|
||||
list($combinator) = $this->getCombinator();
|
||||
$results = $combinator->all($promises)->wait();
|
||||
}
|
||||
|
||||
public function testSomeReturnsArrayOfErrorsAndResults() {
|
||||
$exception = new \RuntimeException('zanzibar');
|
||||
$promises = [
|
||||
'r1' => new Success(42),
|
||||
'r2' => new Failure($exception),
|
||||
'r3' => new Success(40),
|
||||
];
|
||||
|
||||
list($combinator) = $this->getCombinator();
|
||||
list($errors, $results) = $combinator->some($promises)->wait();
|
||||
|
||||
$this->assertSame(['r2' => $exception], $errors);
|
||||
$this->assertSame(['r1' => 42, 'r3' => 40], $results);
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \RuntimeException
|
||||
* @expectedExceptionMessage All promises failed
|
||||
*/
|
||||
public function testSomeThrowsIfNoPromisesResolveSuccessfully() {
|
||||
$promises = [
|
||||
'r1' => new Failure(new \RuntimeException),
|
||||
'r2' => new Failure(new \RuntimeException),
|
||||
];
|
||||
list($combinator) = $this->getCombinator();
|
||||
list($errors, $results) = $combinator->some($promises)->wait();
|
||||
}
|
||||
|
||||
/**
|
||||
* @TODO testMap()
|
||||
*/
|
||||
|
||||
/**
|
||||
* @TODO testFilter()
|
||||
*/
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user