1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 13:21:16 +01:00
amp/lib/functions.php

655 lines
22 KiB
PHP
Raw Normal View History

<?php
2016-08-15 23:46:26 -05:00
namespace Amp {
use React\Promise\PromiseInterface as ReactPromise;
/**
* Returns a new function that wraps $callback in a promise/coroutine-aware function that automatically runs
* Generators as coroutines. The returned function always returns void when invoked. Errors are forwarded to the
* loop's error handler using `Amp\Promise\rethrow()`.
*
* Use this function to create a coroutine-aware callable for a non-promise-aware callback caller.
*
* @param callable(...$args): \Generator|\Amp\Promise|mixed $callback
*
* @return callable(...$args): void
*
* @see coroutine()
*/
function asyncCoroutine(callable $callback): callable {
return function (...$args) use ($callback) {
Promise\rethrow(call($callback, ...$args));
};
}
/**
* Returns a new function that wraps $callback in a promise/coroutine-aware function that automatically runs
* Generators as coroutines. The returned function always returns a promise when invoked. Errors have to be handled
* by the callback caller or they will go unnoticed.
*
* Use this function to create a coroutine-aware callable for a promise-aware callback caller.
*
* @param callable(mixed ...$args): mixed $callback
*
* @return callable(mixed ...$args): \Amp\Promise
*
* @see asyncCoroutine()
*/
function coroutine(callable $callback): callable {
return function (...$args) use ($callback): Promise {
return call($callback, ...$args);
};
}
/**
* Calls the given function, always returning a promise. If the function returns a Generator, it will be run as a
* coroutine. If the function throws, a failed promise will be returned.
*
* @param callable(mixed ...$args): mixed $callback
* @param array ...$args Arguments to pass to the function.
*
* @return \Amp\Promise
*/
function call(callable $callback, ...$args): Promise {
try {
$result = $callback(...$args);
} catch (\Throwable $exception) {
return new Failure($exception);
}
2016-05-21 12:19:48 -05:00
if ($result instanceof \Generator) {
return new Coroutine($result);
}
2016-12-11 16:17:51 +01:00
2017-03-14 11:56:36 -05:00
if ($result instanceof Promise) {
return $result;
2016-05-21 12:19:48 -05:00
}
2016-05-21 09:44:52 -05:00
2017-03-14 11:56:36 -05:00
if ($result instanceof ReactPromise) {
return Promise\adapt($result);
2017-03-14 11:56:36 -05:00
}
return new Success($result);
2017-02-22 15:52:30 -06:00
}
}
namespace Amp\Promise {
use Amp\Deferred;
2017-04-23 14:39:19 +02:00
use Amp\Loop;
use Amp\MultiReasonException;
use Amp\Promise;
use Amp\Success;
use Amp\TimeoutException;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\Internal\createTypeError;
2016-05-21 09:44:52 -05:00
/**
2017-04-23 15:47:52 +02:00
* Registers a callback that will forward the failure reason to the event loop's error handler if the promise fails.
*
2017-04-23 15:47:52 +02:00
* Use this function if you neither return the promise nor handle a possible error yourself to prevent errors from
* going entirely unnoticed.
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise Promise to register the handler on.
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
*/
function rethrow($promise) {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
}
$promise->onResolve(function ($exception) {
if ($exception) {
throw $exception;
}
2016-05-21 12:19:48 -05:00
});
2016-05-21 09:44:52 -05:00
}
/**
* Runs the event loop until the promise is resolved. Should not be called within a running event loop.
*
2017-04-23 15:47:52 +02:00
* Use this function only in synchronous contexts to wait for an asynchronous operation. Use coroutines and yield to
* await promise resolution in a fully asynchronous application instead.
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise Promise to wait for.
*
* @return mixed Promise success value.
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
2017-04-23 15:47:52 +02:00
* @throws \Error If the event loop stopped without the $promise being resolved.
* @throws \Throwable Promise failure reason.
*/
function wait($promise) {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
}
$resolved = false;
try {
Loop::run(function () use (&$resolved, &$value, &$exception, $promise) {
$promise->onResolve(function ($e, $v) use (&$resolved, &$value, &$exception) {
Loop::stop();
$resolved = true;
$exception = $e;
$value = $v;
});
});
} catch (\Throwable $throwable) {
throw new \Error("Loop exceptionally stopped without resolving the promise", 0, $throwable);
}
2016-05-21 12:19:48 -05:00
if (!$resolved) {
throw new \Error("Loop stopped without resolving the promise");
2016-05-21 12:19:48 -05:00
}
2016-05-21 09:44:52 -05:00
if ($exception) {
throw $exception;
2016-05-21 12:19:48 -05:00
}
2016-05-21 09:44:52 -05:00
return $value;
}
/**
2017-04-23 15:47:52 +02:00
* Pipes the promised value through the specified functor once it resolves.
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise
* @param callable (mixed $value): mixed $functor
*
* @return \Amp\Promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
*/
function pipe($promise, callable $functor): Promise {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
2016-05-21 12:19:48 -05:00
}
2016-05-21 09:44:52 -05:00
$deferred = new Deferred;
$promise->onResolve(function ($exception, $value) use ($deferred, $functor) {
if ($exception) {
$deferred->fail($exception);
return;
}
2016-05-21 09:44:52 -05:00
try {
$deferred->resolve($functor($value));
} catch (\Throwable $exception) {
$deferred->fail($exception);
}
});
2016-05-21 09:44:52 -05:00
return $deferred->promise();
}
/**
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise
2017-04-13 18:49:32 +02:00
* @param string $className Throwable class name to capture. Given callback will only be invoked if the failure
* reason is an instance of the given throwable class name.
* @param callable (\Throwable $exception): mixed $functor
*
* @return \Amp\Promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
*/
function capture($promise, string $className, callable $functor): Promise {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
2016-07-12 11:20:06 -05:00
}
2016-05-21 12:19:48 -05:00
$deferred = new Deferred;
2016-07-12 11:20:06 -05:00
$promise->onResolve(function ($exception, $value) use ($deferred, $className, $functor) {
if (!$exception) {
$deferred->resolve($value);
return;
}
2016-07-12 11:20:06 -05:00
if (!$exception instanceof $className) {
$deferred->fail($exception);
return;
}
2016-05-21 12:19:48 -05:00
try {
$deferred->resolve($functor($exception));
} catch (\Throwable $exception) {
$deferred->fail($exception);
}
});
2016-05-21 09:44:52 -05:00
return $deferred->promise();
2017-02-20 14:53:58 -06:00
}
2016-05-22 13:24:39 -05:00
2016-05-21 09:44:52 -05:00
/**
2017-04-23 15:47:52 +02:00
* Creates an artificial timeout for any `Promise`.
*
* If the timeout expires before the promise is resolved, the returned promise fails with an instance of
2017-04-23 15:47:52 +02:00
* `Amp\TimeoutException`.
*
2017-04-23 15:47:52 +02:00
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
2016-05-21 09:44:52 -05:00
*
* @return \Amp\Promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
2016-05-21 09:44:52 -05:00
*/
function timeout($promise, int $timeout): Promise {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
}
$deferred = new Deferred;
2016-05-21 09:44:52 -05:00
$watcher = Loop::delay($timeout, function () use (&$deferred) {
2017-04-07 18:47:44 +02:00
$temp = $deferred; // prevent double resolve
$deferred = null;
2017-04-07 18:47:44 +02:00
$temp->fail(new TimeoutException);
2016-05-21 09:44:52 -05:00
});
Loop::unreference($watcher);
2016-05-21 09:44:52 -05:00
$promise->onResolve(function () use (&$deferred, $promise, $watcher) {
if ($deferred !== null) {
Loop::cancel($watcher);
$deferred->resolve($promise);
}
});
2016-05-21 09:44:52 -05:00
2017-04-07 12:19:37 -05:00
return $deferred->promise();
2016-05-21 09:44:52 -05:00
}
/**
* Adapts any object with a done(callable $onFulfilled, callable $onRejected) or then(callable $onFulfilled,
* callable $onRejected) method to a promise usable by components depending on placeholders implementing
* \AsyncInterop\Promise.
*
* @param object $promise Object with a done() or then() method.
*
* @return \Amp\Promise Promise resolved by the $thenable object.
*
* @throws \Error If the provided object does not have a then() method.
*/
function adapt($promise): Promise {
$deferred = new Deferred;
2016-05-21 09:44:52 -05:00
if (\method_exists($promise, 'done')) {
$promise->done([$deferred, 'resolve'], [$deferred, 'fail']);
} elseif (\method_exists($promise, 'then')) {
$promise->then([$deferred, 'resolve'], [$deferred, 'fail']);
} else {
throw new \Error("Object must have a 'then' or 'done' method");
}
return $deferred->promise();
}
2016-05-21 09:44:52 -05:00
/**
* Returns a promise that is resolved when all promises are resolved. The returned promise will not fail.
* Returned promise succeeds with a two-item array delineating successful and failed promise results,
* with keys identical and corresponding to the original given array.
*
* This function is the same as some() with the notable exception that it will never fail even
* if all promises in the array resolve unsuccessfully.
*
* @param Promise[] $promises
*
* @return \Amp\Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function any(array $promises): Promise {
return some($promises, 0);
2016-05-21 09:44:52 -05:00
}
/**
* Returns a promise that succeeds when all promises succeed, and fails if any promise fails. Returned
* promise succeeds with an array of values used to succeed each contained promise, with keys corresponding to
* the array of promises.
*
* @param \Amp\Promise[] $promises Array of only promises.
*
* @return \Amp\Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function all(array $promises): Promise {
if (empty($promises)) {
return new Success([]);
}
2016-05-21 09:44:52 -05:00
$deferred = new Deferred;
$result = $deferred->promise();
2016-05-21 09:44:52 -05:00
$pending = \count($promises);
$values = [];
foreach ($promises as $key => $promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
2016-05-23 21:32:41 -05:00
}
2017-04-07 12:19:37 -05:00
$promise->onResolve(function ($exception, $value) use (&$deferred, &$values, &$pending, $key) {
2017-04-07 18:47:44 +02:00
if ($pending === 0) {
return;
}
2016-05-21 09:44:52 -05:00
if ($exception) {
2017-04-07 18:47:44 +02:00
$pending = 0;
$deferred->fail($exception);
2017-04-07 12:19:37 -05:00
$deferred = null;
return;
}
2016-05-21 09:44:52 -05:00
$values[$key] = $value;
if (0 === --$pending) {
$deferred->resolve($values);
}
});
}
2016-05-21 12:19:48 -05:00
return $result;
2016-05-21 09:44:52 -05:00
}
/**
* Returns a promise that succeeds when the first promise succeeds, and fails only if all promises fail.
*
* @param \Amp\Promise[] $promises Array of only promises.
*
* @return \Amp\Promise
*
* @throws \Error If the array is empty or a non-Promise is in the array.
*/
function first(array $promises): Promise {
if (empty($promises)) {
throw new \Error("No promises provided");
}
2016-05-21 09:44:52 -05:00
$deferred = new Deferred;
$result = $deferred->promise();
2016-05-21 12:19:48 -05:00
$pending = \count($promises);
$exceptions = [];
foreach ($promises as $key => $promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
2016-07-31 00:31:04 -05:00
}
2017-04-13 18:49:32 +02:00
$promise->onResolve(function ($error, $value) use (&$deferred, &$exceptions, &$pending, &$resolved, $key) {
2017-04-07 18:47:44 +02:00
if ($pending === 0) {
2016-07-31 00:31:04 -05:00
return;
2016-05-21 09:44:52 -05:00
}
2017-04-13 18:49:32 +02:00
if (!$error) {
2017-04-07 18:47:44 +02:00
$pending = 0;
$deferred->resolve($value);
2017-04-07 12:19:37 -05:00
$deferred = null;
return;
}
2017-04-13 18:49:32 +02:00
$exceptions[$key] = $error;
if (0 === --$pending) {
$deferred->fail(new MultiReasonException($exceptions));
}
});
}
return $result;
2016-05-21 09:44:52 -05:00
}
/**
* Resolves with a two-item array delineating successful and failed Promise results.
*
* The returned promise will only fail if the given number of required promises fail.
*
* @param \Amp\Promise[] $promises Array of only promises.
2017-03-26 12:53:26 -05:00
* @param int $required Number of promises that must succeed for the returned promise to succeed.
*
* @return \Amp\Promise
*
* @throws \Error If a non-Promise is in the array.
*/
function some(array $promises, int $required = 1): Promise {
2017-03-27 11:42:11 -05:00
if ($required < 0) {
throw new \Error("Number of promises required must be non-negative");
}
$pending = \count($promises);
2016-05-21 09:44:52 -05:00
2017-03-27 11:42:11 -05:00
if ($required > $pending) {
throw new \Error("Too few promises provided");
}
if (empty($promises)) {
return new Success([[], []]);
}
$deferred = new Deferred;
$result = $deferred->promise();
$values = [];
$exceptions = [];
foreach ($promises as $key => $promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
throw createTypeError([Promise::class, ReactPromise::class], $promise);
}
$promise->onResolve(function ($exception, $value) use (
2017-04-07 18:47:44 +02:00
&$values, &$exceptions, &$pending, $key, $required, $deferred
) {
if ($exception) {
$exceptions[$key] = $exception;
} else {
$values[$key] = $value;
}
2016-07-18 23:29:19 -05:00
if (0 === --$pending) {
if (\count($values) < $required) {
$deferred->fail(new MultiReasonException($exceptions));
2017-04-07 18:47:44 +02:00
} else {
$deferred->resolve([$exceptions, $values]);
}
}
});
}
return $result;
}
2016-07-18 23:23:25 -05:00
}
namespace Amp\Stream {
use Amp\Coroutine;
use Amp\Delayed;
use Amp\Emitter;
use Amp\Producer;
use Amp\Promise;
use Amp\Stream;
2017-04-23 14:39:19 +02:00
use Amp\StreamIterator;
use function Amp\Internal\createTypeError;
/**
2017-04-13 18:49:32 +02:00
* Creates a stream from the given iterable, emitting the each value. The iterable may contain promises. If any
* promise fails, the stream will fail with the same reason.
*
* @param array|\Traversable $iterable Elements to emit.
2017-04-23 15:47:52 +02:00
* @param int $delay Delay between element emissions in milliseconds.
*
* @return \Amp\Stream
*
* @throws \TypeError If the argument is not an array or instance of \Traversable.
*/
function fromIterable(/* iterable */ $iterable, int $delay = 0): Stream {
if (!$iterable instanceof \Traversable && !\is_array($iterable)) {
throw createTypeError(["array", "Traversable"], $iterable);
2016-05-24 11:47:14 -05:00
}
return new Producer(function (callable $emit) use ($iterable, $delay) {
foreach ($iterable as $value) {
if ($delay) {
yield new Delayed($delay);
}
yield $emit($value);
}
});
2016-05-24 11:47:14 -05:00
}
2016-12-11 16:17:51 +01:00
/**
* @param \Amp\Stream $stream
* @param callable (mixed $value): mixed $onEmit
* @param callable (mixed $value): mixed|null $onResolve
*
* @return \Amp\Stream
*/
function map(Stream $stream, callable $onEmit, callable $onResolve = null): Stream {
2017-04-13 18:20:46 +02:00
$streamIterator = new StreamIterator($stream);
return new Producer(function (callable $emit) use ($streamIterator, $onEmit, $onResolve) {
while (yield $streamIterator->advance()) {
yield $emit($onEmit($streamIterator->getCurrent()));
}
if ($onResolve === null) {
2017-04-13 18:20:46 +02:00
return $streamIterator->getResult();
}
2017-04-13 18:20:46 +02:00
return $onResolve($streamIterator->getResult());
});
}
/**
* @param \Amp\Stream $stream
* @param callable (mixed $value): bool $filter
*
* @return \Amp\Stream
*/
function filter(Stream $stream, callable $filter): Stream {
2017-04-13 18:20:46 +02:00
$streamIterator = new StreamIterator($stream);
return new Producer(function (callable $emit) use ($streamIterator, $filter) {
while (yield $streamIterator->advance()) {
if ($filter($streamIterator->getCurrent())) {
yield $emit($streamIterator->getCurrent());
}
}
2017-04-13 18:20:46 +02:00
return $streamIterator->getResult();
});
}
/**
* Creates a stream that emits values emitted from any stream in the array of streams.
*
* @param \Amp\Stream[] $streams
*
* @return \Amp\Stream
*/
function merge(array $streams): Stream {
$emitter = new Emitter;
$result = $emitter->stream();
foreach ($streams as $stream) {
if (!$stream instanceof Stream) {
throw createTypeError([Stream::class], $stream);
}
$stream->onEmit(function ($value) use (&$emitter) {
if ($emitter !== null) {
return $emitter->emit($value);
}
});
2016-05-24 11:47:14 -05:00
}
2016-12-11 16:17:51 +01:00
Promise\all($streams)->onResolve(function ($exception, array $values = null) use (&$emitter) {
if ($exception) {
$emitter->fail($exception);
$emitter = null;
} else {
$emitter->resolve($values);
}
});
return $result;
2016-08-01 11:10:59 -05:00
}
2016-12-11 16:17:51 +01:00
/**
* Concatenates the given streams into a single stream, emitting values from a single stream at a time. The
* prior stream must complete before values are emitted from any subsequent stream. Streams are concatenated
* in the order given (iteration order of the array).
*
* @param array $streams
*
* @return \Amp\Stream
*/
function concat(array $streams): Stream {
foreach ($streams as $stream) {
if (!$stream instanceof Stream) {
throw createTypeError([Stream::class], $stream);
}
}
2016-12-11 16:17:51 +01:00
$emitter = new Emitter;
$subscriptions = [];
$previous = [];
$promise = Promise\all($previous);
foreach ($streams as $stream) {
$generator = function ($value) use ($emitter, $promise) {
static $pending = true, $failed = false;
if ($failed) {
return;
}
if ($pending) {
try {
yield $promise;
$pending = false;
} catch (\Throwable $exception) {
$failed = true;
return; // Prior stream failed.
}
}
2016-12-11 16:17:51 +01:00
yield $emitter->emit($value);
};
$subscriptions[] = $stream->onEmit(function ($value) use ($generator) {
return new Coroutine($generator($value));
});
$previous[] = $stream;
$promise = Promise\all($previous);
2016-08-01 11:10:59 -05:00
}
2016-12-11 16:17:51 +01:00
$promise->onResolve(function ($exception, array $values = null) use ($emitter) {
if ($exception) {
$emitter->fail($exception);
return;
}
2016-12-11 16:17:51 +01:00
$emitter->resolve($values);
});
2016-08-01 11:10:59 -05:00
return $emitter->stream();
2016-05-24 11:47:14 -05:00
}
}