1
0
mirror of https://github.com/danog/amp.git synced 2024-12-11 08:59:46 +01:00
amp/lib/functions.php

1035 lines
31 KiB
PHP
Raw Normal View History

<?php
2016-08-16 06:46:26 +02:00
2018-06-18 20:00:01 +02:00
namespace Amp
{
2020-09-29 21:25:42 +02:00
2020-09-24 18:52:22 +02:00
/**
* Await the resolution of the given promise. The function does not return until the promise has been
* resolved. The promise resolution value is returned or the promise failure reason is thrown.
*
* @template TValue
*
* @param Promise|array<Promise> $promise
2020-09-24 18:52:22 +02:00
*
* @psalm-param Promise<TValue>|array<Promise<TValue>> $promise
2020-09-24 18:52:22 +02:00
*
* @return mixed Promise resolution value.
*
* @throws \Throwable Promise failure reason.
*
* @psalm-return TValue|array<TValue>
*/
function await(Promise|array $promise): mixed
2020-09-24 18:52:22 +02:00
{
if (!$promise instanceof Promise) {
$promise = Promise\all($promise);
2020-09-24 18:52:22 +02:00
}
2020-11-05 18:29:31 +01:00
return \Fiber::suspend(static fn(\Continuation $continuation) => $promise->onResolve(
static fn(?\Throwable $exception, mixed $value) => match ($exception) {
null => $continuation->resume($value),
default => $continuation->throw($exception),
}
), Loop::get());
2020-09-24 18:52:22 +02:00
}
/**
* Creates a green thread using the given callable and argument list.
*
* @template TValue
*
* @param callable(mixed ...$args):TValue $callback
* @param mixed ...$args
*
* @return Promise
*
* @psalm-return Promise<TValue>
*/
function async(callable $callback, mixed ...$args): Promise
{
2020-10-30 16:41:38 +01:00
$placeholder = new Internal\Placeholder;
2020-09-24 18:52:22 +02:00
2020-10-30 16:41:38 +01:00
Loop::defer(static fn() => \Fiber::run(static function () use ($placeholder, $callback, $args): void {
2020-09-24 18:52:22 +02:00
try {
2020-10-30 16:41:38 +01:00
$placeholder->resolve($callback(...$args));
2020-09-29 21:25:42 +02:00
} catch (\Throwable $exception) {
2020-10-30 16:41:38 +01:00
$placeholder->fail($exception);
2020-09-24 18:52:22 +02:00
}
2020-09-29 21:25:42 +02:00
}));
2020-09-24 18:52:22 +02:00
2020-10-30 16:41:38 +01:00
return new Internal\PrivatePromise($placeholder);
2020-09-24 18:52:22 +02:00
}
/**
* Returns a callable that when invoked creates a new green thread using the given callable using {@see async()},
* passing any arguments to the function as the argument list to async() and returning promise created by async().
*
* @param callable $callback Green thread to create each time the function returned is invoked.
*
2020-09-29 21:25:42 +02:00
* @return callable(mixed ...$args):Promise Creates a new green thread each time the returned function is invoked.
* The arguments given to the returned function are passed through to the callable.
2020-09-24 18:52:22 +02:00
*/
function asyncCallable(callable $callback): callable
{
2020-09-29 21:25:42 +02:00
return static fn(mixed ...$args): Promise => async($callback, ...$args);
2020-09-24 18:52:22 +02:00
}
2020-10-04 17:22:21 +02:00
/**
* Executes the given callback in a new green thread using {@see async()}, forwarding any exceptions thrown to
* the event loop error handler using {@see Promise\rethrow()}.
*
* @param callable(mixed ...$args):void $callback
* @param mixed ...$args
*/
function defer(callable $callback, mixed ...$args): void
{
Promise\rethrow(async($callback, ...$args));
}
/**
* Returns a new function that wraps $callback in a promise/coroutine-aware function that automatically runs
2017-05-03 15:21:49 +02:00
* Generators as coroutines. The returned function always returns a promise when invoked. Errors have to be handled
* by the callback caller or they will go unnoticed.
*
2017-05-03 15:21:49 +02:00
* Use this function to create a coroutine-aware callable for a promise-aware callback caller.
*
* @template TReturn
* @template TPromise
2020-04-19 15:38:22 +02:00
* @template TGeneratorReturn
* @template TGeneratorPromise
*
* @template TGenerator as TGeneratorReturn|Promise<TGeneratorPromise>
* @template T as TReturn|Promise<TPromise>|\Generator<mixed, mixed, mixed, TGenerator>
*
* @formatter:off
*
* @param callable(...mixed): T $callback
*
* @return callable
* @psalm-return (T is Promise ? (callable(mixed...): Promise<TPromise>) : (T is \Generator ? (TGenerator is Promise ? (callable(mixed...): Promise<TGeneratorPromise>) : (callable(mixed...): Promise<TGeneratorReturn>)) : (callable(mixed...): Promise<TReturn>)))
2020-04-19 15:38:22 +02:00
*
* @formatter:on
*
2017-05-03 15:21:49 +02:00
* @see asyncCoroutine()
*
* @psalm-suppress InvalidReturnType
2020-09-29 21:25:42 +02:00
*
* @deprecated No longer necessary with ext-fiber
*/
2018-06-18 20:00:01 +02:00
function coroutine(callable $callback): callable
{
/** @psalm-suppress InvalidReturnStatement */
2020-09-29 21:25:42 +02:00
return static fn(...$args): Promise => call($callback, ...$args);
}
/**
* Returns a new function that wraps $callback in a promise/coroutine-aware function that automatically runs
2017-05-03 15:21:49 +02:00
* Generators as coroutines. The returned function always returns void when invoked. Errors are forwarded to the
* loop's error handler using `Amp\Promise\rethrow()`.
*
2017-05-03 15:21:49 +02:00
* Use this function to create a coroutine-aware callable for a non-promise-aware callback caller.
*
2020-04-28 22:34:37 +02:00
* @param callable(...mixed): mixed $callback
*
* @return callable
2020-04-28 22:34:37 +02:00
* @psalm-return callable(mixed...): void
*
2017-05-03 15:21:49 +02:00
* @see coroutine()
2020-09-29 21:25:42 +02:00
*
* @deprecated No longer necessary with ext-fiber
*/
2018-06-18 20:00:01 +02:00
function asyncCoroutine(callable $callback): callable
{
2020-09-29 21:25:42 +02:00
return static fn(...$args) => Promise\rethrow(call($callback, ...$args));
}
/**
* Calls the given function, always returning a promise. If the function returns a Generator, it will be run as a
* coroutine. If the function throws, a failed promise will be returned.
*
* @template TReturn
* @template TPromise
* @template TGeneratorReturn
2020-04-19 15:38:22 +02:00
* @template TGeneratorPromise
*
* @template TGenerator as TGeneratorReturn|Promise<TGeneratorPromise>
* @template T as TReturn|Promise<TPromise>|\Generator<mixed, mixed, mixed, TGenerator>
*
2020-04-19 15:38:22 +02:00
* @formatter:off
*
* @param callable(...mixed): T $callback
* @param mixed ...$args Arguments to pass to the function.
*
* @return Promise
* @psalm-return (T is Promise ? Promise<TPromise> : (T is \Generator ? (TGenerator is Promise ? Promise<TGeneratorPromise> : Promise<TGeneratorReturn>) : Promise<TReturn>))
2020-04-19 15:38:22 +02:00
*
* @formatter:on
2020-09-29 21:25:42 +02:00
*
* @deprecated No longer necessary with ext-fiber
*/
2020-10-04 17:22:21 +02:00
function call(callable $callback, mixed ...$args): Promise
2018-06-18 20:00:01 +02:00
{
try {
$result = $callback(...$args);
} catch (\Throwable $exception) {
return new Failure($exception);
}
2016-05-21 19:19:48 +02:00
if ($result instanceof \Generator) {
return new Coroutine($result);
}
2016-12-11 16:17:51 +01:00
2017-03-14 17:56:36 +01:00
if ($result instanceof Promise) {
return $result;
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2017-03-14 17:56:36 +01:00
return new Success($result);
2017-02-22 22:52:30 +01:00
}
2017-05-03 15:21:49 +02:00
/**
* Calls the given function. If the function returns a Generator, it will be run as a coroutine. If the function
* throws or returns a failing promise, the failure is forwarded to the loop error handler.
*
2020-04-28 22:34:37 +02:00
* @param callable(...mixed): mixed $callback
* @param mixed ...$args Arguments to pass to the function.
2017-05-03 15:21:49 +02:00
*
* @return void
2020-09-29 21:25:42 +02:00
*
* @deprecated No longer necessary with ext-fiber
2017-05-03 15:21:49 +02:00
*/
2020-10-04 17:22:21 +02:00
function asyncCall(callable $callback, mixed ...$args): void
2018-06-18 20:00:01 +02:00
{
2017-05-03 15:21:49 +02:00
Promise\rethrow(call($callback, ...$args));
}
2019-08-02 22:37:42 +02:00
2019-11-11 20:02:09 +01:00
/**
2020-10-04 17:22:21 +02:00
* Async sleep for the specified number of milliseconds.
2020-10-10 16:06:49 +02:00
*
* @param int $milliseconds Number of milliseconds to sleep.
2019-11-11 20:02:09 +01:00
*/
function delay(int $milliseconds): void
2019-11-11 20:02:09 +01:00
{
\Fiber::suspend(static fn(\Continuation $continuation) => Loop::delay(
$milliseconds,
static fn() => $continuation->resume()
), Loop::get());
2019-11-11 20:02:09 +01:00
}
2020-09-25 05:14:58 +02:00
2020-10-10 16:06:49 +02:00
/**
* Await the arrival of a signal to the process.
*
* @param int $signal Required signal to await.
* @param int ...$signals Additional signals to await.
*
* @return int The signal number received.
*/
function signal(int $signal, int ...$signals): int
{
$signals[] = $signal;
return \Fiber::suspend(static function (\Continuation $continuation) use ($signals): void {
$watchers = [];
$callback = static function (string $id, int $signo) use (&$watchers, $continuation): void {
foreach ($watchers as $watcher) {
Loop::cancel($watcher);
}
$continuation->resume($signo);
};
foreach ($signals as $signal) {
$watchers[] = Loop::onSignal($signal, $callback);
}
}, Loop::get());
2020-10-10 16:06:49 +02:00
}
2020-09-25 05:14:58 +02:00
/**
2020-10-04 17:22:21 +02:00
* Returns the current time relative to an arbitrary point in time.
*
* @return int Time in milliseconds.
2020-09-25 05:14:58 +02:00
*/
2020-10-04 17:22:21 +02:00
function getCurrentTime(): int
2020-09-25 05:14:58 +02:00
{
2020-10-04 17:22:21 +02:00
return Internal\getCurrentTime();
2020-09-25 05:14:58 +02:00
}
2017-02-22 22:52:30 +01:00
}
2018-06-18 20:00:01 +02:00
namespace Amp\Promise
{
2020-09-29 21:25:42 +02:00
use Amp\Deferred;
2017-04-23 14:39:19 +02:00
use Amp\Loop;
use Amp\MultiReasonException;
use Amp\Promise;
use Amp\Success;
use Amp\TimeoutException;
use function Amp\async;
use function Amp\await;
use function Amp\Internal\createTypeError;
2016-05-21 16:44:52 +02:00
/**
2017-04-23 15:47:52 +02:00
* Registers a callback that will forward the failure reason to the event loop's error handler if the promise fails.
*
2017-04-23 15:47:52 +02:00
* Use this function if you neither return the promise nor handle a possible error yourself to prevent errors from
* going entirely unnoticed.
*
* @param Promise $promise Promise to register the handler on.
*
2020-03-28 22:20:44 +01:00
* @return void
* @throws \TypeError If $promise is not an instance of \Amp\Promise.
2020-03-28 22:20:44 +01:00
*
*/
function rethrow(Promise $promise): void
2018-06-18 20:00:01 +02:00
{
$promise->onResolve(static function (?\Throwable $exception): void {
if ($exception) {
throw $exception;
}
2016-05-21 19:19:48 +02:00
});
2016-05-21 16:44:52 +02:00
}
/**
* @param Promise $promise Promise to wait for.
*
* @return mixed Promise success value.
*
2020-04-28 22:34:37 +02:00
* @psalm-param T $promise
* @psalm-return (T is Promise ? TPromise : mixed)
*
* @throws \Throwable Promise failure reason.
2020-09-29 21:25:42 +02:00
*
* @deprecated Use {@see await()} instead.
*
* @template TPromise
* @template T as Promise<TPromise>
*/
function wait(Promise $promise): mixed
2018-06-18 20:00:01 +02:00
{
return await($promise);
}
2016-05-21 16:44:52 +02:00
/**
2017-04-23 15:47:52 +02:00
* Creates an artificial timeout for any `Promise`.
*
* If the timeout expires before the promise is resolved, the returned promise fails with an instance of
2017-04-23 15:47:52 +02:00
* `Amp\TimeoutException`.
*
* @template TReturn
2016-05-21 16:44:52 +02:00
*
* @param Promise<TReturn> $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
*
* @return Promise<TReturn>
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise.
2016-05-21 16:44:52 +02:00
*/
function timeout(Promise $promise, int $timeout): Promise
2018-06-18 20:00:01 +02:00
{
$deferred = new Deferred;
2016-05-21 16:44:52 +02:00
$watcher = Loop::delay($timeout, static function () use (&$deferred) {
2017-04-07 18:47:44 +02:00
$temp = $deferred; // prevent double resolve
$deferred = null;
2017-04-07 18:47:44 +02:00
$temp->fail(new TimeoutException);
2016-05-21 16:44:52 +02:00
});
Loop::unreference($watcher);
2016-05-21 16:44:52 +02:00
$promise->onResolve(function () use (&$deferred, $promise, $watcher) {
if ($deferred !== null) {
Loop::cancel($watcher);
$deferred->resolve($promise);
}
});
2016-05-21 16:44:52 +02:00
2017-04-07 19:19:37 +02:00
return $deferred->promise();
2016-05-21 16:44:52 +02:00
}
2018-11-25 17:56:42 +01:00
/**
* Creates an artificial timeout for any `Promise`.
*
* If the promise is resolved before the timeout expires, the result is returned
*
* If the timeout expires before the promise is resolved, a default value is returned
*
* @template TReturn
2018-11-25 17:56:42 +01:00
*
* @param Promise<TReturn> $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
* @param TReturn $default
*
* @return Promise<TReturn>
2018-11-25 17:56:42 +01:00
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise.
2018-11-25 17:56:42 +01:00
*/
function timeoutWithDefault(Promise $promise, int $timeout, mixed $default = null): Promise
2018-11-25 17:56:42 +01:00
{
$promise = timeout($promise, $timeout);
return async(static function () use ($promise, $default) {
2018-11-25 17:56:42 +01:00
try {
return await($promise);
2018-11-25 17:56:42 +01:00
} catch (TimeoutException $exception) {
return $default;
}
});
}
/**
* Adapts any object with a done(callable $onFulfilled, callable $onRejected) or then(callable $onFulfilled,
* callable $onRejected) method to a promise usable by components depending on placeholders implementing
* \AsyncInterop\Promise.
*
* @param object $promise Object with a done() or then() method.
*
2019-11-11 20:02:09 +01:00
* @return Promise Promise resolved by the $thenable object.
*
* @throws \Error If the provided object does not have a then() method.
*/
2020-09-24 18:52:22 +02:00
function adapt(object $promise): Promise
2018-06-18 20:00:01 +02:00
{
$deferred = new Deferred;
2016-05-21 16:44:52 +02:00
if (\method_exists($promise, 'done')) {
$promise->done([$deferred, 'resolve'], [$deferred, 'fail']);
} elseif (\method_exists($promise, 'then')) {
$promise->then([$deferred, 'resolve'], [$deferred, 'fail']);
} else {
throw new \Error("Object must have a 'then' or 'done' method");
}
return $deferred->promise();
}
2016-05-21 16:44:52 +02:00
/**
* Returns a promise that is resolved when all promises are resolved. The returned promise will not fail.
* Returned promise succeeds with a two-item array delineating successful and failed promise results,
* with keys identical and corresponding to the original given array.
*
* This function is the same as some() with the notable exception that it will never fail even
* if all promises in the array resolve unsuccessfully.
*
* @param Promise[] $promises
*
2019-11-11 20:02:09 +01:00
* @return Promise
*
* @throws \Error If a non-Promise is in the array.
*/
2018-06-18 20:00:01 +02:00
function any(array $promises): Promise
{
return some($promises, 0);
2016-05-21 16:44:52 +02:00
}
/**
* Returns a promise that succeeds when all promises succeed, and fails if any promise fails. Returned
* promise succeeds with an array of values used to succeed each contained promise, with keys corresponding to
* the array of promises.
*
* @param Promise[] $promises Array of only promises.
*
2019-11-11 20:02:09 +01:00
* @return Promise
*
* @throws \Error If a non-Promise is in the array.
*
2020-04-19 15:38:22 +02:00
* @template TValue
*
* @psalm-param array<array-key, Promise<TValue>> $promises
* @psalm-assert array<array-key, Promise<TValue>> $promises $promises
2020-04-19 15:38:22 +02:00
* @psalm-return Promise<array<array-key, TValue>>
*/
2018-06-18 20:00:01 +02:00
function all(array $promises): Promise
{
if (empty($promises)) {
return new Success([]);
}
2016-05-21 16:44:52 +02:00
$deferred = new Deferred;
$result = $deferred->promise();
2016-05-21 16:44:52 +02:00
$pending = \count($promises);
$values = [];
foreach ($promises as $key => $promise) {
if (!$promise instanceof Promise) {
throw createTypeError([Promise::class], $promise);
2016-05-24 04:32:41 +02:00
}
$values[$key] = null; // add entry to array to preserve order
2017-04-07 19:19:37 +02:00
$promise->onResolve(function ($exception, $value) use (&$deferred, &$values, &$pending, $key) {
2017-04-07 18:47:44 +02:00
if ($pending === 0) {
return;
}
2016-05-21 16:44:52 +02:00
if ($exception) {
2017-04-07 18:47:44 +02:00
$pending = 0;
$deferred->fail($exception);
2017-04-07 19:19:37 +02:00
$deferred = null;
return;
}
2016-05-21 16:44:52 +02:00
$values[$key] = $value;
if (0 === --$pending) {
$deferred->resolve($values);
}
});
}
2016-05-21 19:19:48 +02:00
return $result;
2016-05-21 16:44:52 +02:00
}
/**
* Returns a promise that succeeds when the first promise succeeds, and fails only if all promises fail.
*
* @param Promise[] $promises Array of only promises.
*
2019-11-11 20:02:09 +01:00
* @return Promise
*
* @throws \Error If the array is empty or a non-Promise is in the array.
*/
2018-06-18 20:00:01 +02:00
function first(array $promises): Promise
{
if (empty($promises)) {
throw new \Error("No promises provided");
}
2016-05-21 16:44:52 +02:00
$deferred = new Deferred;
$result = $deferred->promise();
2016-05-21 19:19:48 +02:00
$pending = \count($promises);
$exceptions = [];
foreach ($promises as $key => $promise) {
if (!$promise instanceof Promise) {
throw createTypeError([Promise::class], $promise);
2016-07-31 07:31:04 +02:00
}
$exceptions[$key] = null; // add entry to array to preserve order
2017-12-05 08:48:36 +01:00
$promise->onResolve(function ($error, $value) use (&$deferred, &$exceptions, &$pending, $key) {
2017-04-07 18:47:44 +02:00
if ($pending === 0) {
2016-07-31 07:31:04 +02:00
return;
2016-05-21 16:44:52 +02:00
}
2017-04-13 18:49:32 +02:00
if (!$error) {
2017-04-07 18:47:44 +02:00
$pending = 0;
$deferred->resolve($value);
2017-04-07 19:19:37 +02:00
$deferred = null;
return;
}
2017-04-13 18:49:32 +02:00
$exceptions[$key] = $error;
if (0 === --$pending) {
$deferred->fail(new MultiReasonException($exceptions));
}
});
}
return $result;
2016-05-21 16:44:52 +02:00
}
/**
* Resolves with a two-item array delineating successful and failed Promise results.
*
* The returned promise will only fail if the given number of required promises fail.
*
* @param Promise[] $promises Array of only promises.
* @param int $required Number of promises that must succeed for the
2019-11-11 20:02:09 +01:00
* returned promise to succeed.
*
2019-11-11 20:02:09 +01:00
* @return Promise
*
* @throws \Error If a non-Promise is in the array.
*/
2018-06-18 20:00:01 +02:00
function some(array $promises, int $required = 1): Promise
{
2017-03-27 18:42:11 +02:00
if ($required < 0) {
throw new \Error("Number of promises required must be non-negative");
}
$pending = \count($promises);
2016-05-21 16:44:52 +02:00
2017-03-27 18:42:11 +02:00
if ($required > $pending) {
throw new \Error("Too few promises provided");
}
if (empty($promises)) {
return new Success([[], []]);
}
$deferred = new Deferred;
$result = $deferred->promise();
$values = [];
$exceptions = [];
foreach ($promises as $key => $promise) {
if (!$promise instanceof Promise) {
throw createTypeError([Promise::class], $promise);
}
$values[$key] = $exceptions[$key] = null; // add entry to arrays to preserve order
$promise->onResolve(static function ($exception, $value) use (
2018-06-18 20:00:01 +02:00
&$values,
&$exceptions,
&$pending,
$key,
$required,
$deferred
) {
if ($exception) {
$exceptions[$key] = $exception;
unset($values[$key]);
} else {
$values[$key] = $value;
unset($exceptions[$key]);
}
2016-07-19 06:29:19 +02:00
if (0 === --$pending) {
if (\count($values) < $required) {
$deferred->fail(new MultiReasonException($exceptions));
2017-04-07 18:47:44 +02:00
} else {
$deferred->resolve([$exceptions, $values]);
}
}
});
}
return $result;
}
2018-11-26 19:36:46 +01:00
/**
* Wraps a promise into another promise, altering the exception or result.
*
* @param Promise $promise
* @param callable $callback
2019-11-11 20:02:09 +01:00
*
2018-11-26 19:36:46 +01:00
* @return Promise
*
* @deprecated Use {@see await()} instead.
2018-11-26 19:36:46 +01:00
*/
function wrap(Promise $promise, callable $callback): Promise
2018-11-26 19:36:46 +01:00
{
return async(function () use ($promise, $callback) {
2018-11-26 19:36:46 +01:00
try {
return $callback(null, await($promise));
2018-11-26 19:36:46 +01:00
} catch (\Throwable $exception) {
return $callback($exception, null);
2018-11-26 19:36:46 +01:00
}
});
}
2016-07-19 06:23:25 +02:00
}
2018-06-18 20:00:01 +02:00
namespace Amp\Iterator
{
2020-09-29 21:25:42 +02:00
use Amp\Delayed;
use Amp\Emitter;
2017-04-27 17:51:06 +02:00
use Amp\Iterator;
2020-08-23 16:18:28 +02:00
use Amp\Pipeline;
use Amp\Producer;
use Amp\Promise;
2018-10-05 21:01:57 +02:00
use function Amp\call;
2017-04-26 20:06:41 +02:00
use function Amp\coroutine;
use function Amp\Internal\createTypeError;
/**
2017-04-27 17:51:06 +02:00
* Creates an iterator from the given iterable, emitting the each value. The iterable may contain promises. If any
* promise fails, the iterator will fail with the same reason.
*
2020-09-24 18:52:22 +02:00
* @param iterable $iterable Elements to emit.
* @param int $delay Delay between element emissions in milliseconds.
*
* @return Iterator
*
* @throws \TypeError If the argument is not an array or instance of \Traversable.
*/
2020-09-24 18:52:22 +02:00
function fromIterable(iterable $iterable, int $delay = 0): Iterator
{
if ($delay) {
return new Producer(static function (callable $emit) use ($iterable, $delay) {
foreach ($iterable as $value) {
yield new Delayed($delay);
yield $emit($value);
}
});
}
return new Producer(static function (callable $emit) use ($iterable) {
foreach ($iterable as $value) {
yield $emit($value);
}
});
2016-05-24 18:47:14 +02:00
}
2016-12-11 16:17:51 +01:00
/**
* @template TValue
* @template TReturn
*
* @param Iterator<TValue> $iterator
* @param callable (TValue $value): TReturn $onEmit
*
* @return Iterator<TReturn>
*/
2018-06-18 20:00:01 +02:00
function map(Iterator $iterator, callable $onEmit): Iterator
{
return new Producer(static function (callable $emit) use ($iterator, $onEmit) {
2017-04-27 17:51:06 +02:00
while (yield $iterator->advance()) {
yield $emit($onEmit($iterator->getCurrent()));
}
});
}
/**
* @template TValue
*
* @param Iterator<TValue> $iterator
* @param callable(TValue $value):bool $filter
*
* @return Iterator<TValue>
*/
2018-06-18 20:00:01 +02:00
function filter(Iterator $iterator, callable $filter): Iterator
{
return new Producer(static function (callable $emit) use ($iterator, $filter) {
2017-04-27 17:51:06 +02:00
while (yield $iterator->advance()) {
if ($filter($iterator->getCurrent())) {
yield $emit($iterator->getCurrent());
}
}
});
}
/**
2017-04-27 17:51:06 +02:00
* Creates an iterator that emits values emitted from any iterator in the array of iterators.
*
* @param Iterator[] $iterators
*
* @return Iterator
*/
2018-06-18 20:00:01 +02:00
function merge(array $iterators): Iterator
{
$emitter = new Emitter;
2017-05-01 07:29:23 +02:00
$result = $emitter->iterate();
$coroutine = coroutine(static function (Iterator $iterator) use (&$emitter) {
2017-04-27 17:51:06 +02:00
while ((yield $iterator->advance()) && $emitter !== null) {
yield $emitter->emit($iterator->getCurrent());
2017-04-26 20:06:41 +02:00
}
});
$coroutines = [];
2017-04-27 17:51:06 +02:00
foreach ($iterators as $iterator) {
if (!$iterator instanceof Iterator) {
throw createTypeError([Iterator::class], $iterator);
}
2020-03-28 22:20:44 +01:00
2017-04-27 17:51:06 +02:00
$coroutines[] = $coroutine($iterator);
2016-05-24 18:47:14 +02:00
}
2016-12-11 16:17:51 +01:00
Promise\all($coroutines)->onResolve(static function ($exception) use (&$emitter) {
if ($exception) {
$emitter->fail($exception);
$emitter = null;
} else {
2017-04-26 20:14:10 +02:00
$emitter->complete();
}
});
return $result;
2016-08-01 18:10:59 +02:00
}
2016-12-11 16:17:51 +01:00
/**
2017-04-27 17:51:06 +02:00
* Concatenates the given iterators into a single iterator, emitting values from a single iterator at a time. The
* prior iterator must complete before values are emitted from any subsequent iterators. Iterators are concatenated
* in the order given (iteration order of the array).
*
* @param Iterator[] $iterators
*
* @return Iterator
*/
2018-06-18 20:00:01 +02:00
function concat(array $iterators): Iterator
{
2017-04-27 17:51:06 +02:00
foreach ($iterators as $iterator) {
if (!$iterator instanceof Iterator) {
throw createTypeError([Iterator::class], $iterator);
}
}
2016-12-11 16:17:51 +01:00
2020-07-11 16:31:35 +02:00
return new Producer(function (callable $emit) use ($iterators) {
foreach ($iterators as $iterator) {
while (yield $iterator->advance()) {
yield $emit($iterator->getCurrent());
}
}
});
2016-05-24 18:47:14 +02:00
}
2018-10-05 21:01:57 +02:00
2020-05-06 18:57:29 +02:00
/**
* Discards all remaining items and returns the number of discarded items.
*
* @template TValue
*
* @param Iterator $iterator
*
* @return Promise
*
* @psalm-param Iterator<TValue> $iterator
* @psalm-return Promise<int>
*/
function discard(Iterator $iterator): Promise
{
return call(static function () use ($iterator): \Generator {
$count = 0;
while (yield $iterator->advance()) {
$count++;
}
return $count;
});
}
2018-10-05 21:01:57 +02:00
/**
* Collects all items from an iterator into an array.
*
* @template TValue
*
2018-10-05 21:01:57 +02:00
* @param Iterator $iterator
*
* @psalm-param Iterator<TValue> $iterator
*
* @return Promise
2020-05-13 17:15:21 +02:00
* @psalm-return Promise<array<int, TValue>>
2018-10-05 21:01:57 +02:00
*/
function toArray(Iterator $iterator): Promise
2018-10-05 21:01:57 +02:00
{
2020-05-13 17:15:21 +02:00
return call(static function () use ($iterator): \Generator {
/** @psalm-var list $array */
2018-10-05 21:01:57 +02:00
$array = [];
2018-10-05 21:01:57 +02:00
while (yield $iterator->advance()) {
$array[] = $iterator->getCurrent();
}
return $array;
});
}
2020-05-13 17:15:21 +02:00
2020-05-18 20:49:56 +02:00
/**
* @template TValue
*
2020-08-23 16:18:28 +02:00
* @param Pipeline $stream
2020-05-18 20:49:56 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-param Pipeline<TValue> $pipeline
2020-05-18 20:49:56 +02:00
*
* @return Iterator
*
* @psalm-return Iterator<TValue>
*/
2020-08-23 16:18:28 +02:00
function fromPipeline(Pipeline $stream): Iterator
2020-05-13 17:15:21 +02:00
{
return new Producer(function (callable $emit) use ($stream): \Generator {
2020-05-18 20:49:56 +02:00
while (null !== $value = yield $stream->continue()) {
2020-05-13 17:15:21 +02:00
yield $emit($value);
}
});
}
}
2020-08-23 16:18:28 +02:00
namespace Amp\Pipeline
2020-05-13 17:15:21 +02:00
{
2020-09-29 21:25:42 +02:00
2020-05-13 17:15:21 +02:00
use Amp\AsyncGenerator;
2020-08-23 16:18:28 +02:00
use Amp\Pipeline;
use Amp\PipelineSource;
2020-05-13 17:15:21 +02:00
use Amp\Promise;
2020-10-02 20:55:58 +02:00
use function Amp\async;
use function Amp\asyncCallable;
2020-09-25 05:14:58 +02:00
use function Amp\await;
use function Amp\delay;
2020-05-13 17:15:21 +02:00
use function Amp\Internal\createTypeError;
/**
2020-08-23 16:18:28 +02:00
* Creates a pipeline from the given iterable, emitting the each value. The iterable may contain promises. If any
* promise fails, the returned pipeline will fail with the same reason.
2020-05-13 17:15:21 +02:00
*
2020-05-18 20:49:56 +02:00
* @template TValue
*
2020-09-24 18:52:22 +02:00
* @param iterable $iterable Elements to emit.
* @param int $delay Delay between elements emitted in milliseconds.
2020-05-13 17:15:21 +02:00
*
2020-05-18 20:49:56 +02:00
* @psalm-param iterable<TValue> $iterable
*
2020-08-23 16:18:28 +02:00
* @return Pipeline
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-return Pipeline<TValue>
2020-05-18 20:49:56 +02:00
*
2020-05-13 17:15:21 +02:00
* @throws \TypeError If the argument is not an array or instance of \Traversable.
*/
2020-09-24 18:52:22 +02:00
function fromIterable(iterable $iterable, int $delay = 0): Pipeline
{
2020-09-25 05:14:58 +02:00
return new AsyncGenerator(static function () use ($iterable, $delay): \Generator {
2020-07-11 16:31:35 +02:00
foreach ($iterable as $value) {
if ($delay) {
delay($delay);
2020-05-13 17:15:21 +02:00
}
if ($value instanceof Promise) {
2020-09-25 05:14:58 +02:00
$value = await($value);
2020-07-11 16:31:35 +02:00
}
2020-09-25 05:14:58 +02:00
yield $value;
2020-05-13 17:15:21 +02:00
}
});
}
/**
* @template TValue
* @template TReturn
*
2020-09-25 05:14:58 +02:00
* @param Pipeline $pipeline
2020-05-28 19:59:55 +02:00
* @param callable(TValue $value):TReturn $onEmit
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-param Pipeline<TValue> $pipeline
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @return Pipeline
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-return Pipeline<TReturn>
2020-05-13 17:15:21 +02:00
*/
2020-09-25 05:14:58 +02:00
function map(Pipeline $pipeline, callable $onEmit): Pipeline
2020-05-13 17:15:21 +02:00
{
2020-09-25 05:14:58 +02:00
return new AsyncGenerator(static function () use ($pipeline, $onEmit): \Generator {
while (null !== $value = $pipeline->continue()) {
yield $onEmit($value);
2020-05-13 17:15:21 +02:00
}
});
}
/**
* @template TValue
*
2020-09-25 05:14:58 +02:00
* @param Pipeline $pipeline
2020-05-18 20:49:56 +02:00
* @param callable(TValue $value):bool $filter
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-param Pipeline<TValue> $pipeline
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @return Pipeline
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-return Pipeline<TValue>
2020-05-13 17:15:21 +02:00
*/
2020-09-25 05:14:58 +02:00
function filter(Pipeline $pipeline, callable $filter): Pipeline
2020-05-13 17:15:21 +02:00
{
2020-09-25 05:14:58 +02:00
return new AsyncGenerator(static function () use ($pipeline, $filter): \Generator {
while (null !== $value = $pipeline->continue()) {
if ($filter($value)) {
yield $value;
2020-05-13 17:15:21 +02:00
}
}
});
}
/**
2020-09-25 05:14:58 +02:00
* Creates a pipeline that emits values emitted from any pipeline in the array of pipelines.
2020-05-13 17:15:21 +02:00
*
2020-09-25 05:14:58 +02:00
* @param Pipeline[] $pipelines
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @return Pipeline
2020-05-13 17:15:21 +02:00
*/
2020-09-25 05:14:58 +02:00
function merge(array $pipelines): Pipeline
2020-05-13 17:15:21 +02:00
{
2020-08-23 16:18:28 +02:00
$source = new PipelineSource;
$result = $source->pipe();
2020-05-13 17:15:21 +02:00
2020-10-02 20:55:58 +02:00
$coroutine = asyncCallable(static function (Pipeline $stream) use (&$source) {
while ((null !== $value = $stream->continue()) && $source !== null) {
$source->yield($value);
2020-05-13 17:15:21 +02:00
}
});
$coroutines = [];
2020-09-25 05:14:58 +02:00
foreach ($pipelines as $pipeline) {
if (!$pipeline instanceof Pipeline) {
throw createTypeError([Pipeline::class], $pipeline);
2020-05-13 17:15:21 +02:00
}
2020-09-25 05:14:58 +02:00
$coroutines[] = $coroutine($pipeline);
2020-05-13 17:15:21 +02:00
}
Promise\all($coroutines)->onResolve(static function ($exception) use (&$source) {
$temp = $source;
$source = null;
if ($exception) {
$temp->fail($exception);
} else {
$temp->complete();
}
});
return $result;
}
/**
2020-09-25 05:14:58 +02:00
* Concatenates the given pipelines into a single pipeline, emitting from a single pipeline at a time. The
* prior pipeline must complete before values are emitted from any subsequent pipelines. Streams are concatenated
2020-05-13 17:15:21 +02:00
* in the order given (iteration order of the array).
*
2020-09-25 05:14:58 +02:00
* @param Pipeline[] $pipelines
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @return Pipeline
2020-05-13 17:15:21 +02:00
*/
2020-09-25 05:14:58 +02:00
function concat(array $pipelines): Pipeline
2020-05-13 17:15:21 +02:00
{
2020-09-25 05:14:58 +02:00
foreach ($pipelines as $pipeline) {
if (!$pipeline instanceof Pipeline) {
throw createTypeError([Pipeline::class], $pipeline);
2020-05-13 17:15:21 +02:00
}
}
2020-09-25 05:14:58 +02:00
return new AsyncGenerator(function () use ($pipelines): \Generator {
foreach ($pipelines as $stream) {
while ($value = $stream->continue()) {
yield $value;
2020-05-13 17:15:21 +02:00
}
}
});
}
/**
* Discards all remaining items and returns the number of discarded items.
*
* @template TValue
*
2020-09-25 05:14:58 +02:00
* @param Pipeline $pipeline
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-param Pipeline<TValue> $pipeline
2020-05-13 17:15:21 +02:00
*
* @return Promise<int>
*/
2020-09-25 05:14:58 +02:00
function discard(Pipeline $pipeline): Promise
2020-05-13 17:15:21 +02:00
{
2020-10-02 20:55:58 +02:00
return async(static function () use ($pipeline): int {
2020-05-13 17:15:21 +02:00
$count = 0;
2020-10-02 20:55:58 +02:00
while (null !== $pipeline->continue()) {
2020-05-13 17:15:21 +02:00
$count++;
}
return $count;
});
}
/**
2020-08-23 16:18:28 +02:00
* Collects all items from a pipeline into an array.
2020-05-13 17:15:21 +02:00
*
* @template TValue
*
2020-09-25 05:14:58 +02:00
* @param Pipeline $pipeline
2020-05-13 17:15:21 +02:00
*
2020-08-23 16:18:28 +02:00
* @psalm-param Pipeline<TValue> $pipeline
2020-05-13 17:15:21 +02:00
*
2020-09-25 05:14:58 +02:00
* @return array
2020-05-13 17:15:21 +02:00
*
2020-09-25 05:14:58 +02:00
* @psalm-return array<int, TValue>
2020-05-13 17:15:21 +02:00
*/
2020-09-25 05:14:58 +02:00
function toArray(Pipeline $pipeline): array
2020-05-13 17:15:21 +02:00
{
return \iterator_to_array($pipeline);
2020-05-13 17:15:21 +02:00
}
2016-05-24 18:47:14 +02:00
}