1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Initial commit

This commit is contained in:
Aaron Piotrowski 2016-05-21 09:44:52 -05:00
commit c7f64ce2c0
16 changed files with 1180 additions and 0 deletions

6
.gitattributes vendored Normal file
View File

@ -0,0 +1,6 @@
example export-ignore
test export-ignore
.gitattributes export-ignore
.gitignore export-ignore
.travis.yml export-ignore
phpunit.xml.dist export-ignore

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
build
composer.lock
phpunit.xml
vendor

22
LICENSE Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2016 amphp
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

37
composer.json Normal file
View File

@ -0,0 +1,37 @@
{
"name": "amphp/awaitable",
"description": "",
"keywords": [
"asynchronous",
"async",
"awaitable",
"coroutine",
"promise",
"future",
"delayed"
],
"homepage": "http://amphp.org",
"license": "MIT",
"require": {
"async-interop/awaitable": "dev-master",
"async-interop/event-loop": "dev-master",
"async-interop/event-loop-implementation": "dev-master"
},
"require-dev": {
"amphp/loop": "dev-master"
},
"minimum-stability": "dev",
"autoload": {
"psr-4": {
"Amp\\Awaitable\\": "lib"
},
"files": [
"lib/functions.php"
]
},
"autoload-dev": {
"psr-4": {
"Amp\\Test\\Awaitable\\": "test"
}
}
}

97
lib/Coroutine.php Normal file
View File

@ -0,0 +1,97 @@
<?php
namespace Amp\Awaitable;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
final class Coroutine implements Awaitable {
use Internal\Placeholder;
// Maximum number of immediate coroutine continuations before deferring next continuation to the loop.
const MAX_RECURSION_DEPTH = 3;
/**
* @var \Generator
*/
private $generator;
/**
* @var callable(\Throwable|\Exception|null $exception, mixed $value): void
*/
private $when;
/**
* @var int
*/
private $depth = 0;
/**
* @param \Generator $generator
*/
public function __construct(\Generator $generator)
{
$this->generator = $generator;
/**
* @param \Throwable|\Exception|null $exception Exception to be thrown into the generator.
* @param mixed $value The value to send to the generator.
*/
$this->when = function ($exception = null, $value = null) {
if (self::MAX_RECURSION_DEPTH < $this->depth) { // Defer continuation to avoid blowing up call stack.
Loop::defer(function () use ($exception, $value) {
$when = $this->when;
$when($exception, $value);
});
return;
}
try {
if ($exception) {
// Throw exception at current execution point.
$this->next($this->generator->throw($exception));
return;
}
// Send the new value and execute to next yield statement.
$this->next($this->generator->send($value), $value);
} catch (\Throwable $exception) {
$this->fail($exception);
} catch (\Exception $exception) {
$this->fail($exception);
}
};
try {
$this->next($this->generator->current());
} catch (\Throwable $exception) {
$this->fail($exception);
} catch (\Exception $exception) {
$this->fail($exception);
}
}
/**
* Examines the value yielded from the generator and prepares the next step in iteration.
*
* @param mixed $yielded Value yielded from generator.
* @param mixed $last Prior resolved value. No longer needed when PHP 5.x support is dropped.
*/
private function next($yielded, $last = null)
{
if (!$this->generator->valid()) {
$this->resolve(PHP_MAJOR_VERSION >= 7 ? $this->generator->getReturn() : $last);
return;
}
++$this->depth;
if ($yielded instanceof Awaitable) {
$yielded->when($this->when);
} else {
$this->resolve($yielded);
}
--$this->depth;
}
}

78
lib/Deferred.php Normal file
View File

@ -0,0 +1,78 @@
<?php
namespace Amp\Awaitable;
use Interop\Async\Awaitable;
try {
if (@assert(false)) {
production: // PHP 7 production environment (zend.assertions=0)
final class Deferred implements Awaitable {
use Internal\Placeholder {
resolve as public;
fail as public;
}
/**
* @return \Interop\Async\Awaitable
*/
public function getAwaitable() {
return $this;
}
}
} else {
development: // PHP 7 development (zend.assertions=1) or PHP 5.x
final class Deferred {
/**
* @var \Interop\Async\Awaitable
*/
private $awaitable;
/**
* @var callable
*/
private $resolve;
/**
* @var callable
*/
private $fail;
public function __construct() {
$this->awaitable = new Promise(function (callable $resolve, callable $fail) {
$this->resolve = $resolve;
$this->fail = $fail;
});
}
/**
* @return \Interop\Async\Awaitable
*/
public function getAwaitable() {
return $this->awaitable;
}
/**
* Fulfill the awaitable with the given value.
*
* @param mixed $value
*/
public function resolve($value = null) {
$resolve = $this->resolve;
$resolve($value);
}
/**
* Fails the awaitable the the given reason.
*
* @param \Throwable|\Exception $reason
*/
public function fail($reason) {
$fail = $this->fail;
$fail($reason);
}
}
}
} catch (\AssertionError $exception) {
goto development; // zend.assertions=1 and assert.exception=1, use development definition.
}

View File

@ -0,0 +1,29 @@
<?php
namespace Amp\Awaitable\Exception;
class MultiReasonException extends \Exception {
/**
* @var \Throwable[]|\Exception[]
*/
private $reasons;
/**
* @param \Exception[] $reasons Array of exceptions rejecting the awaitable.
* @param string|null $message
*/
public function __construct(array $reasons, $message = null)
{
parent::__construct($message ?: 'Too many awaitables were rejected');
$this->reasons = $reasons;
}
/**
* @return \Throwable[]|\Exception[]
*/
public function getReasons()
{
return $this->reasons;
}
}

View File

@ -0,0 +1,5 @@
<?php
namespace Amp\Awaitable\Exception;
class TimeoutException extends \RuntimeException {}

44
lib/Failure.php Normal file
View File

@ -0,0 +1,44 @@
<?php
namespace Amp\Awaitable;
use Interop\Async\Loop;
use Interop\Async\Awaitable;
class Failure implements Awaitable {
/**
* @var \Exception|\Throwable $exception
*/
private $exception;
/**
* @param \Throwable|\Exception $exception Rejection reason.
*
* @throws \InvalidArgumentException If a non-exception is given.
*/
public function __construct($exception)
{
if (!$exception instanceof \Throwable && !$exception instanceof \Exception) {
throw new \InvalidArgumentException('Failure reason must be an exception');
}
$this->exception = $exception;
}
/**
* {@inheritdoc}
*/
public function when(callable $onResolved) {
try {
$onResolved($this->exception, null);
} catch (\Throwable $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
} catch (\Exception $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
}
}
}

15
lib/Future.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace Amp\Awaitable;
use Interop\Async\Awaitable;
/**
* Awaitable implementation that should not be returned from a public API, but used only internally.
*/
final class Future implements Awaitable {
use Internal\Placeholder {
resolve as public;
fail as public;
}
}

51
lib/Internal/Lazy.php Normal file
View File

@ -0,0 +1,51 @@
<?php
namespace Amp\Awaitable\Internal;
use Amp\Awaitable;
class Lazy implements \Interop\Async\Awaitable {
/**
* @var callable|null
*/
private $provider;
/**
* @var \Interop\Async\Awaitable
*/
private $awaitable;
/**
* @param callable $provider
*/
public function __construct(callable $provider) {
$this->provider = $provider;
}
/**
* @return \Interop\Async\Awaitable
*/
protected function getAwaitable() {
if (null === $this->awaitable) {
$provider = $this->provider;
$this->provider = null;
try {
$this->awaitable = Awaitable\resolve($provider());
} catch (\Throwable $exception) {
$this->awaitable = Awaitable\fail($exception);
} catch (\Exception $exception) {
$this->awaitable = Awaitable\fail($exception);
}
}
return $this->awaitable;
}
/**
* {@inheritdoc}
*/
public function when(callable $onResolved) {
$this->getAwaitable()->when($onResolved);
}
}

View File

@ -0,0 +1,104 @@
<?php
namespace Amp\Awaitable\Internal;
use Amp\Awaitable\Failure;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
trait Placeholder {
/**
* @var bool
*/
private $resolved = false;
/**
* @var mixed
*/
private $result;
/**
* @var callable|\Amp\Awaitable\Internal\WhenQueue|null
*/
private $onResolved;
/**
* {@inheritdoc}
*/
public function when(callable $onResolved) {
if ($this->resolved) {
if ($this->result instanceof Awaitable) {
$this->result->when($onResolved);
} else {
$this->execute($onResolved);
}
return;
}
if (null === $this->onResolved) {
$this->onResolved = $onResolved;
} elseif (!$this->onResolved instanceof WhenQueue) {
$this->onResolved = new WhenQueue($this->onResolved);
$this->onResolved->push($onResolved);
} else {
$this->onResolved->push($onResolved);
}
}
/**
* @param mixed $value
*/
protected function resolve($value = null) {
if ($this->resolved) {
return;
}
$this->resolved = true;
if ($value instanceof Awaitable) {
if ($this === $value) {
$value = new Failure(
new \InvalidArgumentException('Cannot resolve an awaitable with itself')
);
}
$this->result = $value;
if (null !== $this->onResolved) {
$this->result->when($this->onResolved);
}
} else {
$this->result = $value;
if (null !== $this->onResolved) {
$this->execute($this->onResolved);
}
}
$this->onResolved = null;
}
/**
* @param \Throwable|\Exception $reason
*/
protected function fail($reason) {
$this->resolve(new Failure($reason));
}
/**
* @param callable $onResolved
*/
private function execute(callable $onResolved) {
try {
$onResolved(null, $this->result);
} catch (\Throwable $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
} catch (\Exception $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
}
}
}

View File

@ -0,0 +1,58 @@
<?php
namespace Amp\Awaitable\Internal;
use Interop\Async\Loop;
class WhenQueue {
/**
* @var callable[]
*/
private $queue = [];
/**
* @param callable|null $callback Initial callback to add to queue.
*/
public function __construct(callable $callback = null) {
if (null !== $callback) {
$this->push($callback);
}
}
/**
* Calls each callback in the queue, passing the provided values to the function.
*
* @param \Throwable|\Exception|null $exception
* @param mixed $value
*/
public function __invoke($exception = null, $value = null) {
foreach ($this->queue as $callback) {
try {
$callback($exception, $value);
} catch (\Throwable $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
} catch (\Exception $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
}
}
}
/**
* Unrolls instances of self to avoid blowing up the call stack on resolution.
*
* @param callable $callback
*/
public function push(callable $callback)
{
if ($callback instanceof self) {
$this->queue = \array_merge($this->queue, $callback->queue);
return;
}
$this->queue[] = $callback;
}
}

46
lib/Promise.php Normal file
View File

@ -0,0 +1,46 @@
<?php
namespace Amp\Awaitable;
use Interop\Async\Awaitable;
/**
* A Promise is an awaitable that provides the functions to resolve or fail the promise to the resolver function
* given to the constructor. A Promise cannot be externally resolved. Only the functions provided to the constructor
* may resolve the Promise.
*/
final class Promise implements Awaitable {
use Internal\Placeholder;
/**
* @param callable(callable $resolve, callable $reject): void $resolver
*/
public function __construct(callable $resolver) {
/**
* Resolves the promise with the given promise or value. If another promise, this promise takes
* on the state of that promise. If a value, the promise will be fulfilled with that value.
*
* @param mixed $value A promise can be resolved with anything other than itself.
*/
$resolve = function ($value = null) {
$this->resolve($value);
};
/**
* Fails the promise with the given exception.
*
* @param \Exception $reason
*/
$fail = function ($reason) {
$this->fail($reason);
};
try {
$resolver($resolve, $fail);
} catch (\Throwable $exception) {
$this->fail($exception);
} catch (\Exception $exception) {
$this->fail($exception);
}
}
}

44
lib/Success.php Normal file
View File

@ -0,0 +1,44 @@
<?php
namespace Amp\Awaitable;
use Interop\Async\Loop;
use Interop\Async\Awaitable;
class Success implements Awaitable {
/**
* @var mixed
*/
private $value;
/**
* @param mixed $value Anything other than an Awaitable object.
*
* @throws \InvalidArgumentException If an awaitable is given as the value.
*/
public function __construct($value)
{
if ($value instanceof Awaitable) {
throw new \InvalidArgumentException('Cannot use an awaitable as success value');
}
$this->value = $value;
}
/**
* {@inheritdoc}
*/
public function when(callable $onResolved) {
try {
$onResolved(null, $this->value);
} catch (\Throwable $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
} catch (\Exception $exception) {
Loop::defer(static function ($watcher, $exception) {
throw $exception;
}, $exception);
}
}
}

540
lib/functions.php Normal file
View File

@ -0,0 +1,540 @@
<?php
namespace Amp\Awaitable;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
use Interop\Async\LoopDriver;
if (!\function_exists(__NAMESPACE__ . '\resolve')) {
/**
* Return a awaitable using the given value. There are four possible outcomes depending on the type of $value:
* (1) \Interop\Async\Awaitable: The awaitable is returned without modification.
* (2) \Generator: The generator is used to create a coroutine.
* (3) callable: The callable is invoked with no arguments. The return value is pass through this function agian.
* (4) All other types: A successful awaitable is returned using the given value as the result.
*
* @param mixed $value
*
* @return \Interop\Async\Awaitable
*/
function resolve($value = null) {
if ($value instanceof Awaitable) {
return $value;
}
if ($value instanceof \Generator) {
return new Coroutine($value);
}
if (\is_callable($value)) {
return resolve($value());
}
return new Success($value);
}
/**
* Create a new failed awaitable using the given exception.
*
* @param \Throwable|\Exception $reason
*
* @return \Interop\Async\Awaitable
*/
function fail($reason) {
return new Failure($reason);
}
/**
* Returns a new function that when invoked runs the Generator returned by $worker as a coroutine.
*
* @param callable(mixed ...$args): \Generator $worker
*
* @return callable(mixed ...$args): \Amp\Awaitable\Coroutine
*/
function coroutine(callable $worker) {
return function (/* ...$args */) use ($worker) {
$generator = \call_user_func_array($worker, \func_get_args());
if (!$generator instanceof \Generator) {
throw new \LogicException('The callable did not return a Generator');
}
return new Coroutine($generator);
};
}
/**
* Registers a callback that will forward the failure reason to the Loop error handler if the awaitable fails.
*
* @param \Interop\Async\Awaitable $awaitable
*/
function rethrow(Awaitable $awaitable) {
/**
* @param \Throwable|\Exception $exception
* @param mixed $value
*/
$awaitable->when(function ($exception = null, $value = null) {
if ($exception) {
/** @var \Throwable|\Exception $exception */
throw $exception;
}
});
}
/**
* Runs the event loop until the awaitable is resolved. Should not be called within a running event loop.
*
* @param \Interop\Async\Awaitable $awaitable
* @param \Interop\Async\LoopDriver|null $driver
*
* @return mixed Awaitable success value.
*
* @throws \Throwable|\Exception Awaitable failure reason.
*/
function wait(Awaitable $awaitable, LoopDriver $driver = null) {
Loop::execute(function () use (&$value, &$exception, $awaitable) {
$awaitable->when(function ($e = null, $v = null) use (&$value, &$exception) {
Loop::stop();
$exception = $e;
$value = $v;
});
}, $driver);
if ($exception) {
throw $exception;
}
return $value;
}
/**
* Pipe the promised value through the specified functor once it resolves.
*
* @param \Interop\Async\Awaitable $awaitable
* @param callable(mixed $value): mixed $functor
*
* @return \Interop\Async\Awaitable
*/
function pipe(Awaitable $awaitable, callable $functor) {
$deferred = new Deferred();
$awaitable->when(function ($exception = null, $value = null) use ($deferred, $functor) {
if ($exception) {
$deferred->fail($exception);
return;
}
try {
$deferred->resolve($functor($value));
} catch (\Throwable $exception) {
$deferred->fail($exception);
} catch (\Exception $exception) {
$deferred->fail($exception);
}
});
return $deferred->getAwaitable();
}
/**
* @param \Interop\Async\Awaitable $awaitable
* @param callable(\Throwable|\Exception $exception): mixed $functor
*
* @return \Interop\Async\Awaitable
*/
function capture(Awaitable $awaitable, callable $functor) {
$deferred = new Deferred();
$awaitable->when(function ($exception = null, $value = null) use ($deferred, $functor) {
if (!$exception) {
$deferred->resolve($value);
return;
}
try {
$deferred->resolve($functor($exception));
} catch (\Throwable $exception) {
$deferred->fail($exception);
} catch (\Exception $exception) {
$deferred->fail($exception);
}
});
return $deferred->getAwaitable();
}
/**
* Create an artificial timeout for any Awaitable.
*
* If the timeout expires before the awaitable is resolved, the returned awaitable fails with an instance of
* \Amp\Awaitable\Exception\TimeoutException.
*
* @param \Interop\Async\Awaitable $awaitable
* @param int $timeout Timeout in milliseconds.
*
* @return \Interop\Async\Awaitable
*/
function timeout(Awaitable $awaitable, $timeout) {
$deferred = new Deferred();
$watcher = Loop::delay(function () use ($deferred) {
$deferred->fail(new Exception\TimeoutException());
}, $timeout);
$onResolved = function () use ($awaitable, $deferred, $watcher) {
Loop::cancel($watcher);
$deferred->resolve($awaitable);
};
$awaitable->when($onResolved);
return $deferred->getAwaitable();
}
/**
* Artificially delays the success of an awaitable $delay milliseconds after the awaitable succeeds. If the
* awaitable fails, the returned awaitable fails immediately.
*
* @param \Interop\Async\Awaitable $awaitable
* @param int $delay Delay in milliseconds.
*
* @return \Interop\Async\Awaitable
*/
function delay(Awaitable $awaitable, $delay) {
$deferred = new Deferred();
$onResolved = function ($exception = null) use ($awaitable, $deferred, $delay) {
if ($exception) {
$deferred->fail($exception);
return;
}
Loop::delay(function () use ($awaitable, $deferred) {
$deferred->resolve($awaitable);
}, $delay);
};
$awaitable->when($onResolved);
return $deferred->getAwaitable();
}
/**
* Returns a awaitable that calls $promisor only when the result of the awaitable is requested (e.g., then() or
* done() is called on the returned awaitable). $promisor can return a awaitable or any value. If $promisor throws
* an exception, the returned awaitable is rejected with that exception.
*
* @param callable $promisor
* @param mixed ...$args
*
* @return \Interop\Async\Awaitable
*/
function lazy(callable $promisor /* ...$args */)
{
$args = \array_slice(\func_get_args(), 1);
if (empty($args)) {
return new Internal\Lazy($promisor);
}
return new Internal\Lazy(function () use ($promisor, $args) {
return \call_user_func_array($promisor, $args);
});
}
/**
* Transforms a function that takes a callback into a function that returns a awaitable. The awaitable is fulfilled
* with an array of the parameters that would have been passed to the callback function.
*
* @param callable $worker Function that normally accepts a callback.
* @param int $index Position of callback in $worker argument list (0-indexed).
*
* @return callable
*/
function promisify(callable $worker, $index = 0)
{
return function (/* ...$args */) use ($worker, $index) {
$args = \func_get_args();
$deferred = new Deferred();
$callback = function (/* ...$args */) use ($deferred) {
$deferred->resolve(\func_get_args());
};
if (\count($args) < $index) {
throw new \InvalidArgumentException('Too few arguments given to function');
}
\array_splice($args, $index, 0, [$callback]);
\call_user_func_array($worker, $args);
return $deferred->getAwaitable();
};
}
/**
* Adapts any object with a then(callable $onFulfilled, callable $onRejected) method to a awaitable usable by
* components depending on placeholders implementing Awaitable.
*
* @param object $thenable Object with a then() method.
*
* @return \Interop\Async\Awaitable Awaitable resolved by the $thenable object.
*/
function adapt($thenable)
{
if (!\is_object($thenable) || !\method_exists($thenable, 'then')) {
return fail(new \InvalidArgumentException('Must provide an object with a then() method'));
}
return new Promise(function (callable $resolve, callable $fail) use ($thenable) {
$thenable->then($resolve, $fail);
});
}
/**
* Wraps the given callable $worker in a awaitable aware function that has the same number of arguments as $worker,
* but those arguments may be awaitables for the future argument value or just values. The returned function will
* return a awaitable for the return value of $worker and will never throw. The $worker function will not be called
* until each awaitable given as an argument is fulfilled. If any awaitable provided as an argument fails, the
* awaitable returned by the returned function will be failed for the same reason. The awaitable succeeds with
* the return value of $worker or failed if $worker throws.
*
* @param callable $worker
*
* @return callable
*/
function lift(callable $worker)
{
/**
* @param mixed ...$args Awaitables or values.
*
* @return \Interop\Async\Awaitable
*/
return function (/* ...$args */) use ($worker) {
$args = \func_get_args();
if (1 === \count($args)) {
return pipe(resolve($args[0]), $worker);
}
return pipe(all($args), function (array $args) use ($worker) {
return \call_user_func_array($worker, $args);
});
};
}
/**
* Returns a awaitable that is resolved when all awaitables are resolved. The returned awaitable will not reject by
* itself (only if cancelled). Returned awaitable succeeds with an array of resolved awaitables, with keys
* identical and corresponding to the original given array.
*
* @param mixed[] $awaitables Awaitables or values (passed through resolve() to create awaitables).
*
* @return \Interop\Async\Awaitable
*/
function settle(array $awaitables)
{
if (empty($awaitables)) {
return resolve([]);
}
$deferred = new Deferred();
$pending = \count($awaitables);
$onResolved = function () use (&$awaitables, &$pending, $deferred) {
if (0 === --$pending) {
$deferred->resolve($awaitables);
}
};
foreach ($awaitables as &$awaitable) {
$awaitable = resolve($awaitable);
$awaitable->when($onResolved);
}
return $deferred->getAwaitable();
}
/**
* Returns a awaitable that succeeds when all awaitables succeed, and fails if any awaitable fails. Returned
* awaitable succeeds with an array of values used to succeed each contained awaitable, with keys corresponding to
* the array of awaitables.
*
* @param mixed[] $awaitables Awaitables or values (passed through resolve() to create awaitables).
*
* @return \Interop\Async\Awaitable
*/
function all(array $awaitables)
{
if (empty($awaitables)) {
return resolve([]);
}
$deferred = new Deferred();
$pending = \count($awaitables);
$values = [];
foreach ($awaitables as $key => $awaitable) {
$onResolved = function ($exception = null, $value = null) use ($key, &$values, &$pending, $deferred) {
if ($exception) {
$deferred->fail($exception);
return;
}
$values[$key] = $value;
if (0 === --$pending) {
$deferred->resolve($values);
}
};
resolve($awaitable)->when($onResolved);
}
return $deferred->getAwaitable();
}
/**
* Returns a awaitable that succeeds when any awaitable succeeds, and fails only if all awaitables fail.
*
* @param mixed[] $awaitables Awaitables or values (passed through resolve() to create awaitables).
*
* @return \Interop\Async\Awaitable
*/
function any(array $awaitables)
{
if (empty($awaitables)) {
return fail(new \InvalidArgumentException('No awaitables provided'));
}
$deferred = new Deferred();
$pending = \count($awaitables);
$exceptions = [];
foreach ($awaitables as $key => $awaitable) {
$onResolved = function ($exception = null, $value = null) use ($key, &$exceptions, &$pending, $deferred) {
if (!$exception) {
$deferred->resolve($value);
return;
}
$exceptions[$key] = $exception;
if (0 === --$pending) {
$deferred->fail(new Exception\MultiReasonException($exceptions));
}
};
resolve($awaitable)->when($onResolved);
}
return $deferred->getAwaitable();
}
/**
* Returns a awaitable that succeeds when $required number of awaitables succeed. The awaitable fails if $required
* number of awaitables can no longer succeed.
*
* @param mixed[] $awaitables Awaitables or values (passed through resolve() to create awaitables).
* @param int $required Number of awaitables that must succeed to succeed the returned awaitable.
*
* @return \Interop\Async\Awaitable
*/
function some(array $awaitables, $required)
{
$required = (int) $required;
if (0 >= $required) {
return resolve([]);
}
$pending = \count($awaitables);
if ($required > $pending) {
return fail(new \InvalidArgumentException('Too few awaitables provided'));
}
$deferred = new Deferred();
$required = \min($pending, $required);
$values = [];
$exceptions = [];
foreach ($awaitables as $key => $awaitable) {
$onResolved = function ($exception = null, $value = null) use (
&$key, &$values, &$exceptions, &$pending, &$required, $deferred
) {
if ($exception) {
$exceptions[$key] = $exception;
if ($required > --$pending) {
$deferred->fail(new Exception\MultiReasonException($exceptions));
}
return;
}
$values[$key] = $value;
--$pending;
if (0 === --$required) {
$deferred->resolve($values);
}
};
resolve($awaitable)->when($onResolved);
}
return $deferred->getAwaitable();
}
/**
* Returns a awaitable that succeeds or fails when the first awaitable succeeds or fails.
*
* @param mixed[] $awaitables Awaitables or values (passed through resolve() to create awaitables).
*
* @return \Interop\Async\Awaitable
*/
function choose(array $awaitables)
{
if (empty($awaitables)) {
return fail(new \InvalidArgumentException('No awaitables provided'));
}
$deferred = new Deferred();
foreach ($awaitables as $awaitable) {
resolve($awaitable)->when(function ($exception = null, $value = null) use ($deferred) {
if ($exception) {
$deferred->fail($exception);
return;
}
$deferred->resolve($value);
});
}
return $deferred->getAwaitable();
}
/**
* Maps the callback to each awaitable as it succeeds. Returns an array of awaitables resolved by the return
* callback value of the callback function. The callback may return awaitables or throw exceptions to fail
* awaitables in the array. If a awaitable in the passed array fails, the callback will not be called and the
* awaitable in the array fails for the same reason. Tip: Use all() or settle() to determine when all
* awaitables in the array have been resolved.
*
* @param callable(mixed $value): mixed $callback
* @param mixed[] ...$awaitables Awaitables or values (passed through resolve() to create awaitables).
*
* @return \Interop\Async\Awaitable[] Array of awaitables resolved with the result of the mapped function.
*/
function map(callable $callback /* array ...$awaitables */)
{
$args = \func_get_args();
$args[0] = lift($args[0]);
return \call_user_func_array('array_map', $args);
}
}