1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 17:37:50 +01:00
amp/lib/functions.php
Niklas Keller 2f778fe069 Use revolt
Removes deprecated APIs.
2021-03-26 22:34:32 +01:00

600 lines
18 KiB
PHP

<?php
namespace Amp
{
use Revolt\EventLoop\Loop;
use function Revolt\EventLoop\delay;
/**
* Await the resolution of the given promise. The function does not return until the promise has been
* resolved. The promise resolution value is returned or the promise failure reason is thrown.
*
* @template TValue
*
* @param Promise|array<Promise> $promise
*
* @psalm-param Promise<TValue>|array<Promise<TValue>> $promise
*
* @return mixed Promise resolution value.
*
* @throws \Throwable Promise failure reason.
*
* @psalm-return TValue|array<TValue>
*/
function await(Promise|array $promise): mixed
{
if (!$promise instanceof Promise) {
$promise = Promise\all($promise);
}
$suspension = Loop::createSuspension();
$promise->onResolve(static function (?\Throwable $exception, mixed $value) use ($suspension): void {
if ($exception) {
$suspension->throw($exception);
} else {
$suspension->resume($value);
}
});
return $suspension->suspend();
}
/**
* Creates a green thread using the given callable and argument list.
*
* @template TValue
*
* @param callable(mixed ...$args):TValue $callback
* @param mixed ...$args
*
* @return Promise
*
* @psalm-return Promise<TValue>
*/
function async(callable $callback, ...$args): Promise
{
$placeholder = new Internal\Placeholder;
\Revolt\EventLoop\defer(function () use ($placeholder, $callback, $args): void {
try {
$placeholder->resolve($callback(...$args));
} catch (\Throwable $exception) {
$placeholder->fail($exception);
}
});
return new Internal\PrivatePromise($placeholder);
}
function asyncValue(int $delay, mixed $value = null): mixed
{
return async(function () use ($delay, $value) {
delay($delay);
return $value;
});
}
}
namespace Amp\Promise
{
use Amp\Deferred;
use Amp\MultiReasonException;
use Amp\Promise;
use Amp\Success;
use Amp\TimeoutException;
use Revolt\EventLoop\Loop;
use function Amp\async;
use function Amp\await;
use function Amp\Internal\createTypeError;
/**
* Registers a callback that will forward the failure reason to the event loop's error handler if the promise fails.
*
* Use this function if you neither return the promise nor handle a possible error yourself to prevent errors from
* going entirely unnoticed.
*
* @param Promise $promise Promise to register the handler on.
*
* @return void
* @throws \TypeError If $promise is not an instance of \Amp\Promise.
*
*/
function rethrow(Promise $promise): void
{
$promise->onResolve(static function (?\Throwable $exception): void {
if ($exception) {
throw $exception;
}
});
}
/**
* Creates an artificial timeout for any `Promise`.
*
* If the timeout expires before the promise is resolved, the returned promise fails with an instance of
* `Amp\TimeoutException`.
*
* @template TReturn
*
* @param Promise<TReturn> $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
*
* @return Promise<TReturn>
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise.
*/
function timeout(Promise $promise, int $timeout): Promise
{
$deferred = new Deferred;
$watcher = Loop::delay($timeout, static function () use (&$deferred) {
$temp = $deferred; // prevent double resolve
$deferred = null;
$temp->fail(new TimeoutException);
});
Loop::unreference($watcher);
$promise->onResolve(function () use (&$deferred, $promise, $watcher) {
if ($deferred !== null) {
Loop::cancel($watcher);
$deferred->resolve($promise);
}
});
return $deferred->promise();
}
/**
* Creates an artificial timeout for any `Promise`.
*
* If the promise is resolved before the timeout expires, the result is returned
*
* If the timeout expires before the promise is resolved, a default value is returned
*
* @template TReturn
*
* @param Promise<TReturn> $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
* @param TReturn $default
*
* @return Promise<TReturn>
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise.
*/
function timeoutWithDefault(Promise $promise, int $timeout, mixed $default = null): Promise
{
$promise = timeout($promise, $timeout);
return async(static function () use ($promise, $default) {
try {
return await($promise);
} catch (TimeoutException $exception) {
return $default;
}
});
}
/**
* Adapts any object with a done(callable $onFulfilled, callable $onRejected) or then(callable $onFulfilled,
* callable $onRejected) method to a promise usable by components depending on placeholders implementing
* \AsyncInterop\Promise.
*
* @param object $promise Object with a done() or then() method.
*
* @return Promise Promise resolved by the $thenable object.
*
* @throws \Error If the provided object does not have a then() method.
*/
function adapt(object $promise): Promise
{
$deferred = new Deferred;
if (\method_exists($promise, 'done')) {
$promise->done([$deferred, 'resolve'], [$deferred, 'fail']);
} elseif (\method_exists($promise, 'then')) {
$promise->then([$deferred, 'resolve'], [$deferred, 'fail']);
} else {
throw new \Error("Object must have a 'then' or 'done' method");
}
return $deferred->promise();
}
/**
* Returns a promise that is resolved when all promises are resolved. The returned promise will not fail.
* Returned promise succeeds with a two-item array delineating successful and failed promise results,
* with keys identical and corresponding to the original given array.
*
* This function is the same as some() with the notable exception that it will never fail even
* if all promises in the array resolve unsuccessfully.
*
* @param Promise[] $promises
*
* @return Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function any(array $promises): Promise
{
return some($promises, 0);
}
/**
* Returns a promise that succeeds when all promises succeed, and fails if any promise fails. Returned
* promise succeeds with an array of values used to succeed each contained promise, with keys corresponding to
* the array of promises.
*
* @param Promise[] $promises Array of only promises.
*
* @return Promise
*
* @throws \Error If a non-Promise is in the array.
*
* @template TValue
*
* @psalm-param array<array-key, Promise<TValue>> $promises
* @psalm-assert array<array-key, Promise<TValue>> $promises $promises
* @psalm-return Promise<array<array-key, TValue>>
*/
function all(array $promises): Promise
{
if (empty($promises)) {
return new Success([]);
}
$deferred = new Deferred;
$result = $deferred->promise();
$pending = \count($promises);
$values = [];
foreach ($promises as $key => $promise) {
if (!$promise instanceof Promise) {
throw createTypeError([Promise::class], $promise);
}
$values[$key] = null; // add entry to array to preserve order
$promise->onResolve(function ($exception, $value) use (&$deferred, &$values, &$pending, $key) {
if ($pending === 0) {
return;
}
if ($exception) {
$pending = 0;
$deferred->fail($exception);
$deferred = null;
return;
}
$values[$key] = $value;
if (0 === --$pending) {
$deferred->resolve($values);
}
});
}
return $result;
}
/**
* Returns a promise that succeeds when the first promise succeeds, and fails only if all promises fail.
*
* @param Promise[] $promises Array of only promises.
*
* @return Promise
*
* @throws \Error If the array is empty or a non-Promise is in the array.
*/
function first(array $promises): Promise
{
if (empty($promises)) {
throw new \Error("No promises provided");
}
$deferred = new Deferred;
$result = $deferred->promise();
$pending = \count($promises);
$exceptions = [];
foreach ($promises as $key => $promise) {
if (!$promise instanceof Promise) {
throw createTypeError([Promise::class], $promise);
}
$exceptions[$key] = null; // add entry to array to preserve order
$promise->onResolve(function ($error, $value) use (&$deferred, &$exceptions, &$pending, $key) {
if ($pending === 0) {
return;
}
if (!$error) {
$pending = 0;
$deferred->resolve($value);
$deferred = null;
return;
}
$exceptions[$key] = $error;
if (0 === --$pending) {
$deferred->fail(new MultiReasonException($exceptions));
}
});
}
return $result;
}
/**
* Resolves with a two-item array delineating successful and failed Promise results.
*
* The returned promise will only fail if the given number of required promises fail.
*
* @param Promise[] $promises Array of only promises.
* @param int $required Number of promises that must succeed for the
* returned promise to succeed.
*
* @return Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function some(array $promises, int $required = 1): Promise
{
if ($required < 0) {
throw new \Error("Number of promises required must be non-negative");
}
$pending = \count($promises);
if ($required > $pending) {
throw new \Error("Too few promises provided");
}
if (empty($promises)) {
return new Success([[], []]);
}
$deferred = new Deferred;
$result = $deferred->promise();
$values = [];
$exceptions = [];
foreach ($promises as $key => $promise) {
if (!$promise instanceof Promise) {
throw createTypeError([Promise::class], $promise);
}
$values[$key] = $exceptions[$key] = null; // add entry to arrays to preserve order
$promise->onResolve(static function ($exception, $value) use (
&$values,
&$exceptions,
&$pending,
$key,
$required,
$deferred
) {
if ($exception) {
$exceptions[$key] = $exception;
unset($values[$key]);
} else {
$values[$key] = $value;
unset($exceptions[$key]);
}
if (0 === --$pending) {
if (\count($values) < $required) {
$deferred->fail(new MultiReasonException($exceptions));
} else {
$deferred->resolve([$exceptions, $values]);
}
}
});
}
return $result;
}
}
namespace Amp\Pipeline
{
use Amp\AsyncGenerator;
use Amp\Pipeline;
use Amp\PipelineSource;
use Amp\Promise;
use function Amp\async;
use function Amp\await;
use function Amp\Internal\createTypeError;
use function Revolt\EventLoop\delay;
/**
* Creates a pipeline from the given iterable, emitting the each value. The iterable may contain promises. If any
* promise fails, the returned pipeline will fail with the same reason.
*
* @template TValue
*
* @param iterable $iterable Elements to emit.
* @param int $delay Delay between elements emitted in milliseconds.
*
* @psalm-param iterable<TValue> $iterable
*
* @return Pipeline
*
* @psalm-return Pipeline<TValue>
*
* @throws \TypeError If the argument is not an array or instance of \Traversable.
*/
function fromIterable(iterable $iterable, int $delay = 0): Pipeline
{
return new AsyncGenerator(static function () use ($iterable, $delay): \Generator {
foreach ($iterable as $value) {
if ($delay) {
delay($delay);
}
if ($value instanceof Promise) {
$value = await($value);
}
yield $value;
}
});
}
/**
* @template TValue
* @template TReturn
*
* @param Pipeline $pipeline
* @param callable(TValue $value):TReturn $onEmit
*
* @psalm-param Pipeline<TValue> $pipeline
*
* @return Pipeline
*
* @psalm-return Pipeline<TReturn>
*/
function map(Pipeline $pipeline, callable $onEmit): Pipeline
{
return new AsyncGenerator(static function () use ($pipeline, $onEmit): \Generator {
while (null !== $value = $pipeline->continue()) {
yield $onEmit($value);
}
});
}
/**
* @template TValue
*
* @param Pipeline $pipeline
* @param callable(TValue $value):bool $filter
*
* @psalm-param Pipeline<TValue> $pipeline
*
* @return Pipeline
*
* @psalm-return Pipeline<TValue>
*/
function filter(Pipeline $pipeline, callable $filter): Pipeline
{
return new AsyncGenerator(static function () use ($pipeline, $filter): \Generator {
while (null !== $value = $pipeline->continue()) {
if ($filter($value)) {
yield $value;
}
}
});
}
/**
* Creates a pipeline that emits values emitted from any pipeline in the array of pipelines.
*
* @param Pipeline[] $pipelines
*
* @return Pipeline
*/
function merge(array $pipelines): Pipeline
{
$source = new PipelineSource;
$result = $source->pipe();
$promises = [];
foreach ($pipelines as $pipeline) {
if (!$pipeline instanceof Pipeline) {
throw createTypeError([Pipeline::class], $pipeline);
}
$promises[] = async(static function () use (&$source, $pipeline) {
while ((null !== $value = $pipeline->continue()) && $source !== null) {
$source->yield($value);
}
});
}
Promise\all($promises)->onResolve(static function ($exception) use (&$source) {
$temp = $source;
$source = null;
if ($exception) {
$temp->fail($exception);
} else {
$temp->complete();
}
});
return $result;
}
/**
* Concatenates the given pipelines into a single pipeline, emitting from a single pipeline at a time. The
* prior pipeline must complete before values are emitted from any subsequent pipelines. Streams are concatenated
* in the order given (iteration order of the array).
*
* @param Pipeline[] $pipelines
*
* @return Pipeline
*/
function concat(array $pipelines): Pipeline
{
foreach ($pipelines as $pipeline) {
if (!$pipeline instanceof Pipeline) {
throw createTypeError([Pipeline::class], $pipeline);
}
}
return new AsyncGenerator(function () use ($pipelines): \Generator {
foreach ($pipelines as $stream) {
while ($value = $stream->continue()) {
yield $value;
}
}
});
}
/**
* Discards all remaining items and returns the number of discarded items.
*
* @template TValue
*
* @param Pipeline $pipeline
*
* @psalm-param Pipeline<TValue> $pipeline
*
* @return Promise<int>
*/
function discard(Pipeline $pipeline): Promise
{
return async(static function () use ($pipeline): int {
$count = 0;
while (null !== $pipeline->continue()) {
$count++;
}
return $count;
});
}
/**
* Collects all items from a pipeline into an array.
*
* @template TValue
*
* @param Pipeline $pipeline
*
* @psalm-param Pipeline<TValue> $pipeline
*
* @return array
*
* @psalm-return array<int, TValue>
*/
function toArray(Pipeline $pipeline): array
{
return \iterator_to_array($pipeline);
}
}