2016-05-21 16:44:52 +02:00
|
|
|
<?php
|
|
|
|
|
2016-05-24 05:48:28 +02:00
|
|
|
namespace Amp;
|
2016-05-21 16:44:52 +02:00
|
|
|
|
|
|
|
use Interop\Async\Awaitable;
|
|
|
|
use Interop\Async\Loop;
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Returns a new function that when invoked runs the Generator returned by $worker as a coroutine.
|
|
|
|
*
|
|
|
|
* @param callable(mixed ...$args): \Generator $worker
|
|
|
|
*
|
2016-05-24 05:48:28 +02:00
|
|
|
* @return callable(mixed ...$args): \Amp\Coroutine
|
2016-05-21 19:19:48 +02:00
|
|
|
*/
|
|
|
|
function coroutine(callable $worker) {
|
|
|
|
return function (/* ...$args */) use ($worker) {
|
|
|
|
$generator = \call_user_func_array($worker, \func_get_args());
|
|
|
|
|
|
|
|
if (!$generator instanceof \Generator) {
|
|
|
|
throw new \LogicException("The callable did not return a Generator");
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return new Coroutine($generator);
|
|
|
|
};
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Registers a callback that will forward the failure reason to the Loop error handler if the awaitable fails.
|
|
|
|
*
|
|
|
|
* @param \Interop\Async\Awaitable $awaitable
|
|
|
|
*/
|
|
|
|
function rethrow(Awaitable $awaitable) {
|
2016-05-22 10:57:10 +02:00
|
|
|
$awaitable->when(function ($exception) {
|
2016-05-21 16:44:52 +02:00
|
|
|
if ($exception) {
|
|
|
|
throw $exception;
|
|
|
|
}
|
2016-05-21 19:19:48 +02:00
|
|
|
});
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Runs the event loop until the awaitable is resolved. Should not be called within a running event loop.
|
|
|
|
*
|
|
|
|
* @param \Interop\Async\Awaitable $awaitable
|
|
|
|
*
|
|
|
|
* @return mixed Awaitable success value.
|
|
|
|
*
|
|
|
|
* @throws \Throwable|\Exception Awaitable failure reason.
|
|
|
|
*/
|
2016-05-26 07:09:50 +02:00
|
|
|
function wait(Awaitable $awaitable) {
|
|
|
|
$resolved = false;
|
|
|
|
Loop::execute(function () use (&$resolved, &$value, &$exception, $awaitable) {
|
|
|
|
$awaitable->when(function ($e, $v) use (&$resolved, &$value, &$exception) {
|
2016-05-21 19:19:48 +02:00
|
|
|
Loop::stop();
|
2016-05-26 07:09:50 +02:00
|
|
|
$resolved = true;
|
2016-05-21 19:19:48 +02:00
|
|
|
$exception = $e;
|
|
|
|
$value = $v;
|
|
|
|
});
|
2016-05-27 01:15:33 +02:00
|
|
|
}, Loop::get());
|
2016-05-26 07:09:50 +02:00
|
|
|
|
|
|
|
if (!$resolved) {
|
|
|
|
throw new \LogicException("Loop emptied without resolving awaitable");
|
|
|
|
}
|
2016-05-21 19:19:48 +02:00
|
|
|
|
|
|
|
if ($exception) {
|
|
|
|
throw $exception;
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return $value;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Pipe the promised value through the specified functor once it resolves.
|
|
|
|
*
|
|
|
|
* @param \Interop\Async\Awaitable $awaitable
|
|
|
|
* @param callable(mixed $value): mixed $functor
|
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
|
|
|
*/
|
|
|
|
function pipe(Awaitable $awaitable, callable $functor) {
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-05-21 19:19:48 +02:00
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
$awaitable->when(function ($exception, $value) use ($deferred, $functor) {
|
2016-05-21 19:19:48 +02:00
|
|
|
if ($exception) {
|
|
|
|
$deferred->fail($exception);
|
|
|
|
return;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
try {
|
|
|
|
$deferred->resolve($functor($value));
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
$deferred->fail($exception);
|
|
|
|
} catch (\Exception $exception) {
|
|
|
|
$deferred->fail($exception);
|
|
|
|
}
|
|
|
|
});
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* @param \Interop\Async\Awaitable $awaitable
|
2016-06-15 04:40:04 +02:00
|
|
|
* @param string $className Exception class name to capture. Given callback will only be invoked if the failure reason
|
|
|
|
* is an instance of the given exception class name.
|
2016-05-21 19:19:48 +02:00
|
|
|
* @param callable(\Throwable|\Exception $exception): mixed $functor
|
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
|
|
|
*/
|
2016-06-15 04:40:04 +02:00
|
|
|
function capture(Awaitable $awaitable, $className, callable $functor) {
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-05-21 19:19:48 +02:00
|
|
|
|
2016-06-15 04:40:04 +02:00
|
|
|
$awaitable->when(function ($exception, $value) use ($deferred, $className, $functor) {
|
2016-05-21 19:19:48 +02:00
|
|
|
if (!$exception) {
|
|
|
|
$deferred->resolve($value);
|
|
|
|
return;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-06-15 04:40:04 +02:00
|
|
|
if (!$exception instanceof $className) {
|
|
|
|
$deferred->fail($exception);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
try {
|
|
|
|
$deferred->resolve($functor($exception));
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
$deferred->fail($exception);
|
|
|
|
} catch (\Exception $exception) {
|
|
|
|
$deferred->fail($exception);
|
|
|
|
}
|
|
|
|
});
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Create an artificial timeout for any Awaitable.
|
|
|
|
*
|
|
|
|
* If the timeout expires before the awaitable is resolved, the returned awaitable fails with an instance of
|
2016-05-24 05:48:28 +02:00
|
|
|
* \Amp\Exception\TimeoutException.
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
|
|
|
* @param \Interop\Async\Awaitable $awaitable
|
|
|
|
* @param int $timeout Timeout in milliseconds.
|
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
|
|
|
*/
|
|
|
|
function timeout(Awaitable $awaitable, $timeout) {
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-07-12 18:20:06 +02:00
|
|
|
$resolved = false;
|
2016-05-21 19:19:48 +02:00
|
|
|
|
2016-07-12 18:20:06 +02:00
|
|
|
$watcher = Loop::delay($timeout, function () use (&$resolved, $deferred) {
|
|
|
|
if (!$resolved) {
|
|
|
|
$resolved = true;
|
|
|
|
$deferred->fail(new TimeoutException);
|
|
|
|
}
|
2016-05-23 07:44:35 +02:00
|
|
|
});
|
2016-05-21 19:19:48 +02:00
|
|
|
|
2016-07-12 18:20:06 +02:00
|
|
|
$awaitable->when(function () use (&$resolved, $awaitable, $deferred, $watcher) {
|
2016-05-21 19:19:48 +02:00
|
|
|
Loop::cancel($watcher);
|
2016-07-12 18:20:06 +02:00
|
|
|
|
|
|
|
if ($resolved) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
$resolved = true;
|
2016-05-21 19:19:48 +02:00
|
|
|
$deferred->resolve($awaitable);
|
2016-06-15 04:40:04 +02:00
|
|
|
});
|
2016-05-21 19:19:48 +02:00
|
|
|
|
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Returns a awaitable that calls $promisor only when the result of the awaitable is requested (e.g., then() or
|
|
|
|
* done() is called on the returned awaitable). $promisor can return a awaitable or any value. If $promisor throws
|
|
|
|
* an exception, the returned awaitable is rejected with that exception.
|
|
|
|
*
|
|
|
|
* @param callable $promisor
|
|
|
|
* @param mixed ...$args
|
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
|
|
|
*/
|
|
|
|
function lazy(callable $promisor /* ...$args */) {
|
|
|
|
$args = \array_slice(\func_get_args(), 1);
|
|
|
|
|
|
|
|
if (empty($args)) {
|
2016-05-22 06:47:50 +02:00
|
|
|
return new Internal\LazyAwaitable($promisor);
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-22 06:47:50 +02:00
|
|
|
return new Internal\LazyAwaitable(function () use ($promisor, $args) {
|
2016-05-21 19:19:48 +02:00
|
|
|
return \call_user_func_array($promisor, $args);
|
|
|
|
});
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Adapts any object with a then(callable $onFulfilled, callable $onRejected) method to a awaitable usable by
|
|
|
|
* components depending on placeholders implementing Awaitable.
|
|
|
|
*
|
|
|
|
* @param object $thenable Object with a then() method.
|
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable Awaitable resolved by the $thenable object.
|
2016-05-23 17:19:37 +02:00
|
|
|
*
|
|
|
|
* @throws \InvalidArgumentException If the provided object does not have a then() method.
|
2016-05-21 19:19:48 +02:00
|
|
|
*/
|
|
|
|
function adapt($thenable) {
|
|
|
|
if (!\is_object($thenable) || !\method_exists($thenable, "then")) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("Must provide an object with a then() method");
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-22 20:24:39 +02:00
|
|
|
$deferred = new Deferred;
|
|
|
|
|
|
|
|
$thenable->then([$deferred, 'resolve'], [$deferred, 'fail']);
|
|
|
|
|
|
|
|
return $deferred->getAwaitable();
|
2016-05-21 19:19:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Wraps the given callable $worker in a awaitable aware function that has the same number of arguments as $worker,
|
|
|
|
* but those arguments may be awaitables for the future argument value or just values. The returned function will
|
|
|
|
* return a awaitable for the return value of $worker and will never throw. The $worker function will not be called
|
|
|
|
* until each awaitable given as an argument is fulfilled. If any awaitable provided as an argument fails, the
|
|
|
|
* awaitable returned by the returned function will be failed for the same reason. The awaitable succeeds with
|
|
|
|
* the return value of $worker or failed if $worker throws.
|
|
|
|
*
|
|
|
|
* @param callable $worker
|
|
|
|
*
|
|
|
|
* @return callable
|
|
|
|
*/
|
|
|
|
function lift(callable $worker) {
|
2016-05-21 16:44:52 +02:00
|
|
|
/**
|
2016-05-21 19:19:48 +02:00
|
|
|
* @param mixed ...$args Awaitables or values.
|
2016-05-21 16:44:52 +02:00
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
|
|
|
*/
|
2016-05-21 19:19:48 +02:00
|
|
|
return function (/* ...$args */) use ($worker) {
|
|
|
|
$args = \func_get_args();
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
foreach ($args as $key => $arg) {
|
|
|
|
if (!$arg instanceof Awaitable) {
|
|
|
|
$args[$key] = new Success($arg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
if (1 === \count($args)) {
|
2016-05-22 17:53:13 +02:00
|
|
|
return pipe($args[0], $worker);
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return pipe(all($args), function (array $args) use ($worker) {
|
|
|
|
return \call_user_func_array($worker, $args);
|
2016-05-21 16:44:52 +02:00
|
|
|
});
|
2016-05-21 19:19:48 +02:00
|
|
|
};
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
2016-05-22 06:47:50 +02:00
|
|
|
* Returns a awaitable that is resolved when all awaitables are resolved. The returned awaitable will not fail.
|
2016-07-19 21:34:17 +02:00
|
|
|
* Returned awaitable succeeds with a two-item array delineating successful and failed awaitable results,
|
|
|
|
* with keys identical and corresponding to the original given array.
|
|
|
|
*
|
|
|
|
* This function is the same as some() with the notable exception that it will never fail even
|
|
|
|
* if all awaitables in the array resolve unsuccessfully.
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
2016-05-22 17:53:13 +02:00
|
|
|
* @param Awaitable[] $awaitables
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
2016-05-23 17:19:37 +02:00
|
|
|
*
|
|
|
|
* @throws \InvalidArgumentException If a non-Awaitable is in the array.
|
2016-05-21 19:19:48 +02:00
|
|
|
*/
|
2016-07-19 21:34:17 +02:00
|
|
|
function any(array $awaitables) {
|
2016-05-21 19:19:48 +02:00
|
|
|
if (empty($awaitables)) {
|
2016-07-19 21:34:17 +02:00
|
|
|
return new Success([[], []]);
|
2016-05-21 19:19:48 +02:00
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$pending = \count($awaitables);
|
2016-07-19 21:34:17 +02:00
|
|
|
$errors = [];
|
|
|
|
$values = [];
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-07-19 21:34:17 +02:00
|
|
|
foreach ($awaitables as $key => $awaitable) {
|
2016-05-22 17:53:13 +02:00
|
|
|
if (!$awaitable instanceof Awaitable) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("Non-awaitable provided");
|
2016-05-22 17:53:13 +02:00
|
|
|
}
|
|
|
|
|
2016-07-19 21:34:17 +02:00
|
|
|
$awaitable->when(function ($error, $value) use (&$pending, &$errors, &$values, $key, $deferred) {
|
|
|
|
if ($error) {
|
|
|
|
$errors[$key] = $error;
|
|
|
|
} else {
|
|
|
|
$values[$key] = $value;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-07-19 21:34:17 +02:00
|
|
|
if (--$pending === 0) {
|
|
|
|
$deferred->resolve([$errors, $values]);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
2016-05-21 19:19:48 +02:00
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Returns a awaitable that succeeds when all awaitables succeed, and fails if any awaitable fails. Returned
|
|
|
|
* awaitable succeeds with an array of values used to succeed each contained awaitable, with keys corresponding to
|
|
|
|
* the array of awaitables.
|
|
|
|
*
|
2016-05-22 17:53:13 +02:00
|
|
|
* @param Awaitable[] $awaitables
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
2016-05-23 17:19:37 +02:00
|
|
|
*
|
|
|
|
* @throws \InvalidArgumentException If a non-Awaitable is in the array.
|
2016-05-21 19:19:48 +02:00
|
|
|
*/
|
|
|
|
function all(array $awaitables) {
|
|
|
|
if (empty($awaitables)) {
|
2016-05-22 17:53:13 +02:00
|
|
|
return new Success([]);
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$pending = \count($awaitables);
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = false;
|
2016-05-21 19:19:48 +02:00
|
|
|
$values = [];
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
foreach ($awaitables as $key => $awaitable) {
|
2016-05-22 17:53:13 +02:00
|
|
|
if (!$awaitable instanceof Awaitable) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("Non-awaitable provided");
|
2016-05-22 17:53:13 +02:00
|
|
|
}
|
|
|
|
|
2016-06-15 04:40:04 +02:00
|
|
|
$awaitable->when(function ($exception, $value) use (&$values, &$pending, &$resolved, $key, $deferred) {
|
2016-05-24 04:32:41 +02:00
|
|
|
if ($resolved) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
if ($exception) {
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = true;
|
2016-05-21 19:19:48 +02:00
|
|
|
$deferred->fail($exception);
|
|
|
|
return;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$values[$key] = $value;
|
2016-05-21 16:44:52 +02:00
|
|
|
if (0 === --$pending) {
|
2016-05-21 19:19:48 +02:00
|
|
|
$deferred->resolve($values);
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
2016-06-15 04:40:04 +02:00
|
|
|
});
|
2016-05-21 19:19:48 +02:00
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2016-05-22 17:53:13 +02:00
|
|
|
* Returns a awaitable that succeeds when the first awaitable succeeds, and fails only if all awaitables fail.
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
2016-05-22 17:53:13 +02:00
|
|
|
* @param Awaitable[] $awaitables
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
2016-05-23 17:19:37 +02:00
|
|
|
*
|
|
|
|
* @throws \InvalidArgumentException If the array is empty or a non-Awaitable is in the array.
|
2016-05-21 19:19:48 +02:00
|
|
|
*/
|
2016-05-22 17:53:13 +02:00
|
|
|
function first(array $awaitables) {
|
2016-05-21 19:19:48 +02:00
|
|
|
if (empty($awaitables)) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("No awaitables provided");
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$pending = \count($awaitables);
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = false;
|
2016-05-21 19:19:48 +02:00
|
|
|
$exceptions = [];
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
foreach ($awaitables as $key => $awaitable) {
|
2016-05-22 17:53:13 +02:00
|
|
|
if (!$awaitable instanceof Awaitable) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("Non-awaitable provided");
|
2016-05-22 17:53:13 +02:00
|
|
|
}
|
|
|
|
|
2016-06-15 04:40:04 +02:00
|
|
|
$awaitable->when(function ($exception, $value) use (&$exceptions, &$pending, &$resolved, $key, $deferred) {
|
2016-05-24 04:32:41 +02:00
|
|
|
if ($resolved) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
if (!$exception) {
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = true;
|
2016-05-21 19:19:48 +02:00
|
|
|
$deferred->resolve($value);
|
|
|
|
return;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$exceptions[$key] = $exception;
|
|
|
|
if (0 === --$pending) {
|
2016-05-24 17:39:19 +02:00
|
|
|
$deferred->fail(new MultiReasonException($exceptions));
|
2016-05-21 19:19:48 +02:00
|
|
|
}
|
2016-06-15 04:40:04 +02:00
|
|
|
});
|
2016-05-21 19:19:48 +02:00
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns a awaitable that succeeds when $required number of awaitables succeed. The awaitable fails if $required
|
|
|
|
* number of awaitables can no longer succeed.
|
|
|
|
*
|
2016-05-22 17:53:13 +02:00
|
|
|
* @param Awaitable[] $awaitables
|
2016-05-21 19:19:48 +02:00
|
|
|
* @param int $required Number of awaitables that must succeed to succeed the returned awaitable.
|
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
|
|
|
*/
|
|
|
|
function some(array $awaitables, $required) {
|
|
|
|
$required = (int) $required;
|
|
|
|
|
|
|
|
if (0 >= $required) {
|
2016-05-22 17:53:13 +02:00
|
|
|
return new Success([]);
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$pending = \count($awaitables);
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
if ($required > $pending) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("Too few awaitables provided");
|
2016-05-21 19:19:48 +02:00
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = false;
|
2016-05-21 19:19:48 +02:00
|
|
|
$values = [];
|
|
|
|
$exceptions = [];
|
|
|
|
|
|
|
|
foreach ($awaitables as $key => $awaitable) {
|
2016-05-22 17:53:13 +02:00
|
|
|
if (!$awaitable instanceof Awaitable) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("Non-awaitable provided");
|
2016-05-22 17:53:13 +02:00
|
|
|
}
|
|
|
|
|
2016-06-15 04:40:04 +02:00
|
|
|
$awaitable->when(function ($exception, $value) use (
|
|
|
|
&$values, &$exceptions, &$pending, &$resolved, &$required, $key, $deferred
|
2016-05-21 19:19:48 +02:00
|
|
|
) {
|
2016-05-24 04:32:41 +02:00
|
|
|
if ($resolved) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
if ($exception) {
|
2016-05-21 16:44:52 +02:00
|
|
|
$exceptions[$key] = $exception;
|
2016-05-21 19:19:48 +02:00
|
|
|
if ($required > --$pending) {
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = true;
|
2016-05-24 17:39:19 +02:00
|
|
|
$deferred->fail(new MultiReasonException($exceptions));
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
2016-05-21 19:19:48 +02:00
|
|
|
return;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$values[$key] = $value;
|
|
|
|
--$pending;
|
|
|
|
if (0 === --$required) {
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = true;
|
2016-05-21 19:19:48 +02:00
|
|
|
$deferred->resolve($values);
|
|
|
|
}
|
2016-06-15 04:40:04 +02:00
|
|
|
});
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Returns a awaitable that succeeds or fails when the first awaitable succeeds or fails.
|
|
|
|
*
|
2016-05-22 17:53:13 +02:00
|
|
|
* @param Awaitable[] $awaitables
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable
|
2016-05-23 17:19:37 +02:00
|
|
|
*
|
|
|
|
* @throws \InvalidArgumentException If the array is empty or a non-Awaitable is in the array.
|
2016-05-21 19:19:48 +02:00
|
|
|
*/
|
|
|
|
function choose(array $awaitables) {
|
|
|
|
if (empty($awaitables)) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("No awaitables provided");
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
$deferred = new Deferred;
|
2016-05-24 04:32:41 +02:00
|
|
|
$resolved = false;
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
foreach ($awaitables as $awaitable) {
|
2016-05-22 17:53:13 +02:00
|
|
|
if (!$awaitable instanceof Awaitable) {
|
2016-05-23 17:19:37 +02:00
|
|
|
throw new \InvalidArgumentException("Non-awaitable provided");
|
2016-05-22 17:53:13 +02:00
|
|
|
}
|
|
|
|
|
2016-05-24 04:32:41 +02:00
|
|
|
$awaitable->when(function ($exception, $value) use (&$resolved, $deferred) {
|
|
|
|
if ($resolved) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
$resolved = true;
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
if ($exception) {
|
|
|
|
$deferred->fail($exception);
|
|
|
|
return;
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
$deferred->resolve($value);
|
|
|
|
});
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
return $deferred->getAwaitable();
|
|
|
|
}
|
2016-05-21 16:44:52 +02:00
|
|
|
|
2016-05-21 19:19:48 +02:00
|
|
|
/**
|
|
|
|
* Maps the callback to each awaitable as it succeeds. Returns an array of awaitables resolved by the return
|
|
|
|
* callback value of the callback function. The callback may return awaitables or throw exceptions to fail
|
|
|
|
* awaitables in the array. If a awaitable in the passed array fails, the callback will not be called and the
|
2016-07-19 21:34:17 +02:00
|
|
|
* awaitable in the array fails for the same reason. Tip: Use all() or any() to determine when all
|
2016-05-21 19:19:48 +02:00
|
|
|
* awaitables in the array have been resolved.
|
|
|
|
*
|
|
|
|
* @param callable(mixed $value): mixed $callback
|
2016-05-22 17:53:13 +02:00
|
|
|
* @param Awaitable[] ...$awaitables
|
2016-05-21 19:19:48 +02:00
|
|
|
*
|
|
|
|
* @return \Interop\Async\Awaitable[] Array of awaitables resolved with the result of the mapped function.
|
|
|
|
*/
|
|
|
|
function map(callable $callback /* array ...$awaitables */) {
|
|
|
|
$args = \func_get_args();
|
2016-06-15 04:40:04 +02:00
|
|
|
$count = \count($args);
|
|
|
|
$args[0] = lift($callback);
|
2016-05-23 17:19:37 +02:00
|
|
|
|
|
|
|
for ($i = 1; $i < $count; ++$i) {
|
|
|
|
foreach ($args[$i] as $awaitable) {
|
2016-05-24 04:32:41 +02:00
|
|
|
if (!$awaitable instanceof Awaitable) {
|
2016-05-24 17:39:19 +02:00
|
|
|
throw new \InvalidArgumentException('Non-awaitable provided');
|
2016-05-23 17:19:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-22 17:53:13 +02:00
|
|
|
return \call_user_func_array("array_map", $args);
|
2016-05-21 16:44:52 +02:00
|
|
|
}
|
2016-07-19 06:29:19 +02:00
|
|
|
|
2016-07-19 06:23:25 +02:00
|
|
|
/**
|
|
|
|
* @param \Amp\Observable $observable
|
|
|
|
* @param callable(mixed $value): mixed $onNext
|
|
|
|
* @param callable(mixed $value): mixed|null $onComplete
|
|
|
|
*
|
|
|
|
* @return \Amp\Observable
|
|
|
|
*/
|
|
|
|
function each(Observable $observable, callable $onNext, callable $onComplete = null) {
|
|
|
|
return new Emitter(function (callable $emit) use ($observable, $onNext, $onComplete) {
|
2016-07-20 15:53:18 +02:00
|
|
|
$observable->subscribe(function ($value) use ($emit, $onNext) {
|
2016-07-19 06:23:25 +02:00
|
|
|
return $emit($onNext($value));
|
2016-07-20 15:53:18 +02:00
|
|
|
});
|
|
|
|
|
|
|
|
$result = (yield $observable);
|
2016-07-19 06:23:25 +02:00
|
|
|
|
|
|
|
if ($onComplete === null) {
|
|
|
|
yield Coroutine::result($result);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
yield Coroutine::result($onComplete($result));
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param \Amp\Observable $observable
|
|
|
|
* @param callable(mixed $value): bool $filter
|
|
|
|
*
|
|
|
|
* @return \Amp\Observable
|
|
|
|
*/
|
|
|
|
function filter(Observable $observable, callable $filter) {
|
|
|
|
return new Emitter(function (callable $emit) use ($observable, $filter) {
|
2016-07-20 15:53:18 +02:00
|
|
|
$observable->subscribe(function ($value) use ($emit, $filter) {
|
2016-07-19 06:23:25 +02:00
|
|
|
if (!$filter($value)) {
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
return $emit($value);
|
2016-07-20 15:53:18 +02:00
|
|
|
});
|
|
|
|
|
|
|
|
yield Coroutine::result(yield $observable);
|
2016-07-19 06:23:25 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-05-24 18:47:14 +02:00
|
|
|
/**
|
|
|
|
* Creates an observable that emits values emitted from any observable in the array of observables. Values in the
|
|
|
|
* array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables,
|
|
|
|
* or any other value.
|
|
|
|
*
|
|
|
|
* @param \Amp\Observable[] $observables
|
|
|
|
*
|
|
|
|
* @return \Amp\Observable
|
|
|
|
*/
|
|
|
|
function merge(array $observables) {
|
|
|
|
foreach ($observables as $observable) {
|
|
|
|
if (!$observable instanceof Observable) {
|
|
|
|
throw new \InvalidArgumentException("Non-observable provided");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-02 17:43:46 +02:00
|
|
|
return new Emitter(function (callable $emit) use ($observables) {
|
|
|
|
$subscriptions = [];
|
2016-05-24 18:47:14 +02:00
|
|
|
|
2016-06-02 17:43:46 +02:00
|
|
|
foreach ($observables as $observable) {
|
|
|
|
$subscriptions[] = $observable->subscribe($emit);
|
|
|
|
}
|
2016-05-27 01:20:05 +02:00
|
|
|
|
2016-06-02 17:43:46 +02:00
|
|
|
try {
|
2016-07-20 15:53:18 +02:00
|
|
|
$result = (yield all($observables));
|
2016-06-02 17:43:46 +02:00
|
|
|
} finally {
|
|
|
|
foreach ($subscriptions as $subscription) {
|
|
|
|
$subscription->unsubscribe();
|
|
|
|
}
|
2016-05-24 18:47:14 +02:00
|
|
|
}
|
|
|
|
|
2016-06-02 17:43:46 +02:00
|
|
|
yield Coroutine::result($result);
|
2016-05-24 18:47:14 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2016-07-19 06:23:25 +02:00
|
|
|
|
|
|
|
/**
|
2016-07-19 22:24:05 +02:00
|
|
|
* Creates an observable from the given array of awaitables, emitting the success value of each provided awaitable or
|
2016-07-19 06:23:25 +02:00
|
|
|
* failing if any awaitable fails.
|
|
|
|
*
|
|
|
|
* @param \Interop\Async\Awaitable[] $awaitables
|
|
|
|
*
|
|
|
|
* @return \Amp\Observable
|
|
|
|
*/
|
|
|
|
function stream(array $awaitables) {
|
|
|
|
$postponed = new Postponed;
|
|
|
|
|
|
|
|
if (empty($awaitables)) {
|
2016-07-19 22:24:05 +02:00
|
|
|
$postponed->resolve();
|
|
|
|
return $postponed->getObservable();
|
2016-07-19 06:23:25 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
$pending = \count($awaitables);
|
|
|
|
$onResolved = function ($exception, $value) use (&$pending, $postponed) {
|
|
|
|
if ($pending <= 0) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if ($exception) {
|
|
|
|
$pending = 0;
|
|
|
|
$postponed->fail($exception);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
$postponed->emit($value);
|
|
|
|
|
|
|
|
if (--$pending === 0) {
|
|
|
|
$postponed->complete();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
foreach ($awaitables as $awaitable) {
|
|
|
|
if (!$awaitable instanceof Awaitable) {
|
|
|
|
throw new \InvalidArgumentException("Non-awaitable provided");
|
|
|
|
}
|
|
|
|
|
|
|
|
$awaitable->when($onResolved);
|
|
|
|
}
|
|
|
|
|
2016-07-19 22:24:05 +02:00
|
|
|
return $postponed->getObservable();
|
2016-07-19 06:23:25 +02:00
|
|
|
}
|
|
|
|
|
2016-05-24 18:47:14 +02:00
|
|
|
/**
|
2016-07-20 15:53:18 +02:00
|
|
|
* Returns an observable that emits a value every $interval milliseconds after (up to $count times). The value emitted
|
|
|
|
* is an integer of the number of times the observable emitted a value.
|
2016-05-24 18:47:14 +02:00
|
|
|
*
|
|
|
|
* @param int $interval Time interval between emitted values in milliseconds.
|
2016-07-20 15:53:18 +02:00
|
|
|
* @param int $count Number of values to emit. PHP_INT_MAX by default.
|
2016-05-24 18:47:14 +02:00
|
|
|
*
|
|
|
|
* @return \Amp\Observable
|
|
|
|
*/
|
2016-07-20 15:53:18 +02:00
|
|
|
function interval($interval, $count = PHP_INT_MAX) {
|
2016-05-24 18:47:14 +02:00
|
|
|
$count = (int) $count;
|
2016-07-20 15:53:18 +02:00
|
|
|
if (0 >= $count) {
|
|
|
|
throw new \InvalidArgumentException("The number of times to emit must be a positive value");
|
2016-05-24 18:47:14 +02:00
|
|
|
}
|
|
|
|
|
2016-05-27 01:20:05 +02:00
|
|
|
$postponed = new Postponed;
|
|
|
|
|
|
|
|
Loop::repeat($interval, function ($watcher) use (&$i, $postponed, $count) {
|
|
|
|
$postponed->emit(++$i);
|
|
|
|
|
|
|
|
if ($i === $count) {
|
2016-05-24 18:47:14 +02:00
|
|
|
Loop::cancel($watcher);
|
2016-05-29 18:35:09 +02:00
|
|
|
$postponed->resolve();
|
2016-05-24 18:47:14 +02:00
|
|
|
}
|
|
|
|
});
|
2016-05-27 01:20:05 +02:00
|
|
|
|
|
|
|
return $postponed->getObservable();
|
2016-05-24 18:47:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param int $start
|
|
|
|
* @param int $end
|
|
|
|
* @param int $step
|
|
|
|
*
|
|
|
|
* @return \Amp\Observable
|
|
|
|
*/
|
|
|
|
function range($start, $end, $step = 1) {
|
|
|
|
$start = (int) $start;
|
|
|
|
$end = (int) $end;
|
|
|
|
$step = (int) $step;
|
|
|
|
|
|
|
|
if (0 === $step) {
|
|
|
|
throw new \InvalidArgumentException("Step must be a non-zero integer");
|
|
|
|
}
|
|
|
|
|
|
|
|
if ((($end - $start) ^ $step) < 0) {
|
|
|
|
throw new \InvalidArgumentException("Step is not of the correct sign");
|
|
|
|
}
|
|
|
|
|
2016-06-02 17:43:46 +02:00
|
|
|
return new Emitter(function (callable $emit) use ($start, $end, $step) {
|
2016-05-24 18:47:14 +02:00
|
|
|
for ($i = $start; $i <= $end; $i += $step) {
|
2016-06-02 17:43:46 +02:00
|
|
|
yield $emit($i);
|
2016-05-27 01:20:05 +02:00
|
|
|
}
|
2016-05-24 18:47:14 +02:00
|
|
|
});
|
|
|
|
}
|