1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00
amp/lib/functions.php

748 lines
22 KiB
PHP
Raw Normal View History

<?php
2016-08-16 06:46:26 +02:00
2016-05-24 05:48:28 +02:00
namespace Amp;
2016-05-21 16:44:52 +02:00
2017-02-20 21:53:58 +01:00
use React\Promise\PromiseInterface as ReactPromise;
/**
2016-11-14 20:59:21 +01:00
* Wraps the callback in a promise/coroutine-aware function that automatically upgrades Generators to coroutines and
2016-12-29 23:57:08 +01:00
* calls rethrow() on the returned promises (or the coroutine created).
*
* @param callable(...$args): \Generator|\Amp\Promise|mixed $callback
*
* @return callable(...$args): void
*/
2016-08-11 21:35:58 +02:00
function wrap(callable $callback): callable {
return function (...$args) use ($callback) {
$result = $callback(...$args);
if ($result instanceof \Generator) {
2016-12-29 23:57:08 +01:00
$result = new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
2017-02-20 21:53:58 +01:00
$result = adapt($result);
2016-12-29 23:57:08 +01:00
}
2016-12-29 23:57:08 +01:00
if ($result instanceof Promise) {
rethrow($result);
}
};
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
2016-11-14 20:59:21 +01:00
* Returns a new function that wraps $worker in a promise/coroutine-aware function that automatically upgrades
* Generators to coroutines. The returned function always returns a promise when invoked. If $worker throws, a failed
* promise is returned.
2016-05-21 19:19:48 +02:00
*
* @param callable(mixed ...$args): mixed $worker
2016-05-21 19:19:48 +02:00
*
* @return callable(mixed ...$args): \Amp\Promise
2016-05-21 19:19:48 +02:00
*/
2016-08-11 21:35:58 +02:00
function coroutine(callable $worker): callable {
2016-11-14 20:59:21 +01:00
return function (...$args) use ($worker): Promise {
try {
$result = $worker(...$args);
} catch (\Throwable $exception) {
return new Failure($exception);
}
2016-05-21 19:19:48 +02:00
if ($result instanceof \Generator) {
return new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
2017-02-20 21:53:58 +01:00
$result = adapt($result);
}
2016-12-11 16:17:51 +01:00
2016-11-14 20:59:21 +01:00
if (!$result instanceof Promise) {
return new Success($result);
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
return $result;
2016-05-21 19:19:48 +02:00
};
}
2016-05-21 16:44:52 +02:00
2017-02-22 22:52:30 +01:00
/**
* Calls the given function, always returning a promise. If the function returns a Generator, it will be run as a
* coroutine. If the function throws, a failed promise will be returned.
*
* @param callable(mixed ...$args): mixed $functor
2017-02-22 22:52:30 +01:00
* @param array ...$args Arguments to pass to the function.
*
2017-03-12 21:39:17 +01:00
* @return \Amp\Promise
2017-02-22 22:52:30 +01:00
*/
function call(callable $functor, ...$args): Promise {
try {
$result = $functor(...$args);
} catch (\Throwable $exception) {
return new Failure($exception);
}
if ($result instanceof \Generator) {
return new Coroutine($result);
} elseif ($result instanceof ReactPromise) {
2017-02-20 21:53:58 +01:00
$result = adapt($result);
2017-02-22 22:52:30 +01:00
}
if (!$result instanceof Promise) {
return new Success($result);
}
return $result;
}
2016-05-21 19:19:48 +02:00
/**
2016-11-14 20:59:21 +01:00
* Registers a callback that will forward the failure reason to the Loop error handler if the promise fails.
2016-05-21 19:19:48 +02:00
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
2016-05-21 19:19:48 +02:00
*/
function rethrow($promise) {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw new \TypeError("Must provide an instance of %s or %s", Promise::class, ReactPromise::class);
}
}
2016-11-14 20:59:21 +01:00
$promise->when(function ($exception) {
2016-05-21 16:44:52 +02:00
if ($exception) {
throw $exception;
}
2016-05-21 19:19:48 +02:00
});
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
2016-11-14 20:59:21 +01:00
* Runs the event loop until the promise is resolved. Should not be called within a running event loop.
2016-05-21 19:19:48 +02:00
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise
2016-05-21 19:19:48 +02:00
*
2016-11-14 20:59:21 +01:00
* @return mixed Promise success value.
2016-05-21 19:19:48 +02:00
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
2016-11-14 20:59:21 +01:00
* @throws \Throwable Promise failure reason.
2016-05-21 19:19:48 +02:00
*/
function wait($promise) {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw new \TypeError("Must provide an instance of %s or %s", Promise::class, ReactPromise::class);
}
}
2016-05-26 07:09:50 +02:00
$resolved = false;
Loop::run(function () use (&$resolved, &$value, &$exception, $promise) {
2016-11-14 20:59:21 +01:00
$promise->when(function ($e, $v) use (&$resolved, &$value, &$exception) {
2016-05-21 19:19:48 +02:00
Loop::stop();
2016-05-26 07:09:50 +02:00
$resolved = true;
2016-05-21 19:19:48 +02:00
$exception = $e;
$value = $v;
});
2017-03-11 05:59:11 +01:00
});
2016-05-26 07:09:50 +02:00
if (!$resolved) {
2016-12-11 16:12:42 +01:00
throw new \Error("Loop stopped without resolving promise");
2016-05-26 07:09:50 +02:00
}
2016-05-21 19:19:48 +02:00
if ($exception) {
throw $exception;
2016-05-21 16:44:52 +02:00
}
2016-05-21 19:19:48 +02:00
return $value;
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
* Pipe the promised value through the specified functor once it resolves.
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise
* @param callable(mixed $value): mixed $functor
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
2016-05-21 19:19:48 +02:00
*/
function pipe($promise, callable $functor): Promise {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw new \TypeError("Must provide an instance of %s or %s", Promise::class, ReactPromise::class);
}
}
$deferred = new Deferred;
2016-05-21 19:19:48 +02:00
2016-11-14 20:59:21 +01:00
$promise->when(function ($exception, $value) use ($deferred, $functor) {
2016-05-21 19:19:48 +02:00
if ($exception) {
$deferred->fail($exception);
return;
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
try {
$deferred->resolve($functor($value));
} catch (\Throwable $exception) {
$deferred->fail($exception);
}
});
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise
* @param string $className Throwable class name to capture. Given callback will only be invoked if the failure reason
* is an instance of the given throwable class name.
* @param callable(\Throwable $exception): mixed $functor
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
2016-05-21 19:19:48 +02:00
*/
function capture($promise, string $className, callable $functor): Promise {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw new \TypeError("Must provide an instance of %s or %s", Promise::class, ReactPromise::class);
}
}
$deferred = new Deferred;
2016-05-21 19:19:48 +02:00
2016-11-14 20:59:21 +01:00
$promise->when(function ($exception, $value) use ($deferred, $className, $functor) {
2016-05-21 19:19:48 +02:00
if (!$exception) {
$deferred->resolve($value);
return;
}
2016-05-21 16:44:52 +02:00
if (!$exception instanceof $className) {
$deferred->fail($exception);
return;
}
2016-05-21 19:19:48 +02:00
try {
$deferred->resolve($functor($exception));
} catch (\Throwable $exception) {
$deferred->fail($exception);
}
});
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
2016-11-14 20:59:21 +01:00
* Create an artificial timeout for any Promise.
2016-05-21 19:19:48 +02:00
*
2016-11-14 20:59:21 +01:00
* If the timeout expires before the promise is resolved, the returned promise fails with an instance of
2016-08-11 21:35:58 +02:00
* \Amp\TimeoutException.
2016-05-21 19:19:48 +02:00
*
* @param \Amp\Promise|\React\Promise\PromiseInterface $promise
* @param int $timeout Timeout in milliseconds.
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
2016-05-21 19:19:48 +02:00
*/
function timeout($promise, int $timeout): Promise {
if (!$promise instanceof Promise) {
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} else {
throw new \TypeError("Must provide an instance of %s or %s", Promise::class, ReactPromise::class);
}
}
$deferred = new Deferred;
2016-07-12 18:20:06 +02:00
$resolved = false;
2016-05-21 19:19:48 +02:00
2016-07-12 18:20:06 +02:00
$watcher = Loop::delay($timeout, function () use (&$resolved, $deferred) {
if (!$resolved) {
$resolved = true;
$deferred->fail(new TimeoutException);
}
});
2016-12-17 15:28:19 +01:00
Loop::unreference($watcher);
2016-05-21 19:19:48 +02:00
2016-11-14 20:59:21 +01:00
$promise->when(function () use (&$resolved, $promise, $deferred, $watcher) {
2016-05-21 19:19:48 +02:00
Loop::cancel($watcher);
2016-07-12 18:20:06 +02:00
if ($resolved) {
return;
}
$resolved = true;
2016-11-14 20:59:21 +01:00
$deferred->resolve($promise);
});
2016-05-21 19:19:48 +02:00
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
2017-02-20 21:53:58 +01:00
* Adapts any object with a done(callable $onFulfilled, callable $onRejected) or then(callable $onFulfilled,
* callable $onRejected) method to a promise usable by components depending on placeholders implementing
* \AsyncInterop\Promise.
2016-05-21 19:19:48 +02:00
*
2017-02-20 21:53:58 +01:00
* @param object $promise Object with a done() or then() method.
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise Promise resolved by the $thenable object.
*
2016-08-11 21:35:58 +02:00
* @throws \Error If the provided object does not have a then() method.
2016-05-21 19:19:48 +02:00
*/
2017-02-20 21:53:58 +01:00
function adapt($promise): Promise {
2016-05-22 20:24:39 +02:00
$deferred = new Deferred;
2017-02-20 21:53:58 +01:00
if (\method_exists($promise, 'done')) {
$promise->done([$deferred, 'resolve'], [$deferred, 'fail']);
} elseif (\method_exists($promise, 'then')) {
$promise->then([$deferred, 'resolve'], [$deferred, 'fail']);
} else {
throw new \Error("Object must have a 'then' or 'done' method");
}
2016-05-22 20:24:39 +02:00
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
/**
2016-11-14 20:59:21 +01:00
* Wraps the given callable $worker in a promise aware function that has the same number of arguments as $worker,
* but those arguments may be promises for the future argument value or just values. The returned function will
* return a promise for the return value of $worker and will never throw. The $worker function will not be called
* until each promise given as an argument is fulfilled. If any promise provided as an argument fails, the
* promise returned by the returned function will be failed for the same reason. The promise succeeds with
2016-05-21 19:19:48 +02:00
* the return value of $worker or failed if $worker throws.
*
* @param callable $worker
*
* @return callable
*/
2016-08-11 21:35:58 +02:00
function lift(callable $worker): callable {
2016-05-21 16:44:52 +02:00
/**
2016-11-14 20:59:21 +01:00
* @param mixed ...$args Promises or values.
2016-05-21 16:44:52 +02:00
*
* @return \Amp\Promise
2016-05-21 16:44:52 +02:00
*/
2016-11-14 20:59:21 +01:00
return function (...$args) use ($worker): Promise {
foreach ($args as $key => $arg) {
2016-11-14 20:59:21 +01:00
if (!$arg instanceof Promise) {
2017-02-20 21:53:58 +01:00
if ($arg instanceof ReactPromise) {
$args[$key] = adapt($arg);
} else {
$args[$key] = new Success($arg);
}
}
}
2016-05-21 19:19:48 +02:00
if (1 === \count($args)) {
return pipe($args[0], $worker);
2016-05-21 16:44:52 +02:00
}
2016-05-21 19:19:48 +02:00
return pipe(all($args), function (array $args) use ($worker) {
\ksort($args); // Needed to ensure correct argument order.
2016-08-11 21:35:58 +02:00
return $worker(...$args);
2016-05-21 16:44:52 +02:00
});
2016-05-21 19:19:48 +02:00
};
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
2016-11-14 20:59:21 +01:00
* Returns a promise that is resolved when all promises are resolved. The returned promise will not fail.
* Returned promise succeeds with a two-item array delineating successful and failed promise results,
* with keys identical and corresponding to the original given array.
*
* This function is the same as some() with the notable exception that it will never fail even
2016-11-14 20:59:21 +01:00
* if all promises in the array resolve unsuccessfully.
2016-05-21 19:19:48 +02:00
*
2016-11-14 20:59:21 +01:00
* @param Promise[] $promises
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise
*
2016-11-14 20:59:21 +01:00
* @throws \Error If a non-Promise is in the array.
2016-05-21 19:19:48 +02:00
*/
2016-11-14 20:59:21 +01:00
function any(array $promises): Promise {
if (empty($promises)) {
return new Success([[], []]);
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
$deferred = new Deferred;
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
$pending = \count($promises);
$errors = [];
$values = [];
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
foreach ($promises as $key => $promise) {
2017-02-20 21:53:58 +01:00
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
2016-11-14 20:59:21 +01:00
throw new \Error("Non-promise provided");
}
2016-11-14 20:59:21 +01:00
$promise->when(function ($error, $value) use (&$pending, &$errors, &$values, $key, $deferred) {
if ($error) {
$errors[$key] = $error;
} else {
$values[$key] = $value;
}
2016-05-21 16:44:52 +02:00
if (--$pending === 0) {
$deferred->resolve([$errors, $values]);
}
});
}
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
2016-11-14 20:59:21 +01:00
* Returns a promise that succeeds when all promises succeed, and fails if any promise fails. Returned
* promise succeeds with an array of values used to succeed each contained promise, with keys corresponding to
* the array of promises.
2016-05-21 19:19:48 +02:00
*
2016-11-14 20:59:21 +01:00
* @param Promise[] $promises
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise
*
2016-11-14 20:59:21 +01:00
* @throws \Error If a non-Promise is in the array.
2016-05-21 19:19:48 +02:00
*/
2016-11-14 20:59:21 +01:00
function all(array $promises): Promise {
if (empty($promises)) {
return new Success([]);
2016-05-21 16:44:52 +02:00
}
$deferred = new Deferred;
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
$pending = \count($promises);
2016-05-24 04:32:41 +02:00
$resolved = false;
2016-05-21 19:19:48 +02:00
$values = [];
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
foreach ($promises as $key => $promise) {
2017-02-20 21:53:58 +01:00
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
2016-11-14 20:59:21 +01:00
throw new \Error("Non-promise provided");
}
2016-11-14 20:59:21 +01:00
$promise->when(function ($exception, $value) use (&$values, &$pending, &$resolved, $key, $deferred) {
2016-05-24 04:32:41 +02:00
if ($resolved) {
return;
}
2016-05-21 19:19:48 +02:00
if ($exception) {
2016-05-24 04:32:41 +02:00
$resolved = true;
2016-05-21 19:19:48 +02:00
$deferred->fail($exception);
return;
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
$values[$key] = $value;
2016-05-21 16:44:52 +02:00
if (0 === --$pending) {
2016-05-21 19:19:48 +02:00
$deferred->resolve($values);
2016-05-21 16:44:52 +02:00
}
});
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
/**
2016-11-14 20:59:21 +01:00
* Returns a promise that succeeds when the first promise succeeds, and fails only if all promises fail.
2016-05-21 19:19:48 +02:00
*
2016-11-14 20:59:21 +01:00
* @param Promise[] $promises
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise
*
2016-11-14 20:59:21 +01:00
* @throws \Error If the array is empty or a non-Promise is in the array.
2016-05-21 19:19:48 +02:00
*/
2016-11-14 20:59:21 +01:00
function first(array $promises): Promise {
if (empty($promises)) {
throw new \Error("No promises provided");
2016-05-21 16:44:52 +02:00
}
$deferred = new Deferred;
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
$pending = \count($promises);
2016-05-24 04:32:41 +02:00
$resolved = false;
2016-05-21 19:19:48 +02:00
$exceptions = [];
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
foreach ($promises as $key => $promise) {
2017-02-20 21:53:58 +01:00
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
2016-11-14 20:59:21 +01:00
throw new \Error("Non-promise provided");
}
2016-11-14 20:59:21 +01:00
$promise->when(function ($exception, $value) use (&$exceptions, &$pending, &$resolved, $key, $deferred) {
2016-05-24 04:32:41 +02:00
if ($resolved) {
return;
}
2016-05-21 19:19:48 +02:00
if (!$exception) {
2016-05-24 04:32:41 +02:00
$resolved = true;
2016-05-21 19:19:48 +02:00
$deferred->resolve($value);
return;
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
$exceptions[$key] = $exception;
if (0 === --$pending) {
2016-05-24 17:39:19 +02:00
$deferred->fail(new MultiReasonException($exceptions));
2016-05-21 19:19:48 +02:00
}
});
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
/**
2016-11-14 20:59:21 +01:00
* Resolves with a two-item array delineating successful and failed Promise results.
2016-05-21 19:19:48 +02:00
*
2016-11-14 20:59:21 +01:00
* The returned promise will only fail if ALL of the promises fail.
*
2016-11-14 20:59:21 +01:00
* @param Promise[] $promises
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise
2016-05-21 19:19:48 +02:00
*/
2016-11-14 20:59:21 +01:00
function some(array $promises): Promise {
if (empty($promises)) {
throw new \Error("No promises provided");
2016-05-21 16:44:52 +02:00
}
2016-11-14 20:59:21 +01:00
$pending = \count($promises);
2016-05-21 16:44:52 +02:00
$deferred = new Deferred;
2016-05-21 19:19:48 +02:00
$values = [];
$exceptions = [];
2016-11-14 20:59:21 +01:00
foreach ($promises as $key => $promise) {
2017-02-20 21:53:58 +01:00
if ($promise instanceof ReactPromise) {
$promise = adapt($promise);
} elseif (!$promise instanceof Promise) {
2016-11-14 20:59:21 +01:00
throw new \Error("Non-promise provided");
}
2016-11-14 20:59:21 +01:00
$promise->when(function ($exception, $value) use (&$values, &$exceptions, &$pending, $key, $deferred) {
2016-05-21 19:19:48 +02:00
if ($exception) {
2016-05-21 16:44:52 +02:00
$exceptions[$key] = $exception;
2016-07-31 07:31:04 +02:00
} else {
$values[$key] = $value;
}
if (0 === --$pending) {
if (empty($values)) {
2016-05-24 17:39:19 +02:00
$deferred->fail(new MultiReasonException($exceptions));
2016-07-31 07:31:04 +02:00
return;
2016-05-21 16:44:52 +02:00
}
2016-07-31 07:31:04 +02:00
$deferred->resolve([$exceptions, $values]);
2016-05-21 19:19:48 +02:00
}
});
2016-05-21 16:44:52 +02:00
}
2016-11-14 20:59:21 +01:00
return $deferred->promise();
2016-05-21 19:19:48 +02:00
}
2016-05-21 16:44:52 +02:00
2016-05-21 19:19:48 +02:00
/**
2016-11-14 20:59:21 +01:00
* Maps the callback to each promise as it succeeds. Returns an array of promises resolved by the return
* callback value of the callback function. The callback may return promises or throw exceptions to fail
* promises in the array. If a promise in the passed array fails, the callback will not be called and the
* promise in the array fails for the same reason. Tip: Use all() or any() to determine when all
* promises in the array have been resolved.
2016-05-21 19:19:48 +02:00
*
* @param callable(mixed $value): mixed $callback
2016-11-14 20:59:21 +01:00
* @param Promise[] ...$promises
2016-05-21 19:19:48 +02:00
*
* @return \Amp\Promise[] Array of promises resolved with the result of the mapped function.
2016-05-21 19:19:48 +02:00
*/
2016-11-14 20:59:21 +01:00
function map(callable $callback, array ...$promises): array {
foreach ($promises as $promiseSet) {
foreach ($promiseSet as $promise) {
2017-02-20 21:53:58 +01:00
if (!$promise instanceof Promise && !$promise instanceof ReactPromise) {
2016-11-14 20:59:21 +01:00
throw new \Error("Non-promise provided");
}
}
}
2017-02-10 05:17:45 +01:00
return \array_map(lift($callback), ...$promises);
2016-05-21 16:44:52 +02:00
}
2016-07-19 06:29:19 +02:00
/**
2017-01-04 02:10:27 +01:00
* Creates a stream from the given iterable, emitting the each value. The iterable may contain promises. If any promise
* fails, the stream will fail with the same reason.
*
* @param array|\Traversable $iterable
*
2017-01-04 02:10:27 +01:00
* @return \Amp\Stream
*
* @throws \TypeError If the argument is not an array or instance of \Traversable.
*/
2017-03-12 17:33:46 +01:00
function stream(/* iterable */ $iterable): Stream {
if (!$iterable instanceof \Traversable && !\is_array($iterable)) {
throw new \TypeError("Must provide an array or instance of Traversable");
}
2017-01-04 02:10:27 +01:00
return new Producer(function (callable $emit) use ($iterable) {
foreach ($iterable as $value) {
yield $emit($value);
}
});
}
2016-07-19 06:23:25 +02:00
/**
2017-01-04 02:10:27 +01:00
* @param \Amp\Stream $stream
* @param callable(mixed $value): mixed $onNext
* @param callable(mixed $value): mixed|null $onComplete
2016-07-19 06:23:25 +02:00
*
2017-01-04 02:10:27 +01:00
* @return \Amp\Stream
2016-07-19 06:23:25 +02:00
*/
2017-01-04 02:10:27 +01:00
function each(Stream $stream, callable $onNext, callable $onComplete = null): Stream {
2017-02-10 05:17:45 +01:00
$listener = new Listener($stream);
return new Producer(function (callable $emit) use ($listener, $onNext, $onComplete) {
while (yield $listener->advance()) {
yield $emit($onNext($listener->getCurrent()));
}
2016-07-19 06:23:25 +02:00
if ($onComplete === null) {
2017-02-10 05:17:45 +01:00
return $listener->getResult();
2016-07-19 06:23:25 +02:00
}
2017-02-10 05:17:45 +01:00
return $onComplete($listener->getResult());
2016-07-19 06:23:25 +02:00
});
}
/**
2017-01-04 02:10:27 +01:00
* @param \Amp\Stream $stream
* @param callable(mixed $value): bool $filter
2016-07-19 06:23:25 +02:00
*
2017-01-04 02:10:27 +01:00
* @return \Amp\Stream
2016-07-19 06:23:25 +02:00
*/
2017-01-04 02:10:27 +01:00
function filter(Stream $stream, callable $filter): Stream {
2017-02-10 05:17:45 +01:00
$listener = new Listener($stream);
return new Producer(function (callable $emit) use ($listener, $filter) {
while (yield $listener->advance()) {
if ($filter($listener->getCurrent())) {
yield $emit($listener->getCurrent());
2016-07-19 06:23:25 +02:00
}
}
2017-02-10 05:17:45 +01:00
return $listener->getResult();
2016-07-19 06:23:25 +02:00
});
}
2016-05-24 18:47:14 +02:00
/**
2017-01-04 02:10:27 +01:00
* Creates a stream that emits values emitted from any stream in the array of streams.
2016-05-24 18:47:14 +02:00
*
2017-01-04 02:10:27 +01:00
* @param \Amp\Stream[] $streams
2016-05-24 18:47:14 +02:00
*
2017-01-04 02:10:27 +01:00
* @return \Amp\Stream
2016-05-24 18:47:14 +02:00
*/
2017-01-04 02:10:27 +01:00
function merge(array $streams): Stream {
$emitter = new Emitter;
$pending = true;
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
foreach ($streams as $stream) {
if (!$stream instanceof Stream) {
throw new \Error("Non-stream provided");
2016-05-24 18:47:14 +02:00
}
2017-01-04 02:10:27 +01:00
$stream->listen(function ($value) use (&$pending, $emitter) {
if ($pending) {
2017-01-04 02:10:27 +01:00
return $emitter->emit($value);
}
return null;
});
2016-05-24 18:47:14 +02:00
}
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
all($streams)->when(function ($exception, array $values = null) use (&$pending, $emitter) {
$pending = false;
if ($exception) {
2017-01-04 02:10:27 +01:00
$emitter->fail($exception);
return;
2016-05-24 18:47:14 +02:00
}
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
$emitter->resolve($values);
2016-05-24 18:47:14 +02:00
});
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
return $emitter->stream();
2016-07-19 06:23:25 +02:00
}
2016-08-01 18:10:59 +02:00
/**
2017-01-04 02:10:27 +01:00
* Concatenates the given streams into a single stream, emitting values from a single stream at a time. The
* prior stream must complete before values are emitted from any subsequent stream. Streams are concatenated
2016-08-01 18:10:59 +02:00
* in the order given (iteration order of the array).
*
2017-01-04 02:10:27 +01:00
* @param array $streams
2016-08-01 18:10:59 +02:00
*
2017-01-04 02:10:27 +01:00
* @return \Amp\Stream
2016-08-01 18:10:59 +02:00
*/
2017-01-04 02:10:27 +01:00
function concat(array $streams): Stream {
foreach ($streams as $stream) {
if (!$stream instanceof Stream) {
throw new \Error("Non-stream provided");
2016-08-01 18:10:59 +02:00
}
}
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
$subscriptions = [];
$previous = [];
2016-11-14 20:59:21 +01:00
$promise = all($previous);
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
foreach ($streams as $stream) {
$generator = function ($value) use ($emitter, $promise) {
static $pending = true, $failed = false;
if ($failed) {
return;
}
if ($pending) {
try {
yield $promise;
$pending = false;
} catch (\Throwable $exception) {
$failed = true;
2017-01-04 02:10:27 +01:00
return; // Prior stream failed.
}
2016-08-01 18:10:59 +02:00
}
2017-01-04 02:10:27 +01:00
yield $emitter->emit($value);
};
2017-01-04 02:10:27 +01:00
$subscriptions[] = $stream->listen(function ($value) use ($generator) {
return new Coroutine($generator($value));
});
2017-01-04 02:10:27 +01:00
$previous[] = $stream;
2016-11-14 20:59:21 +01:00
$promise = all($previous);
}
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
$promise->when(function ($exception, array $values = null) use ($emitter) {
if ($exception) {
2017-01-04 02:10:27 +01:00
$emitter->fail($exception);
return;
2016-08-01 18:10:59 +02:00
}
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
$emitter->resolve($values);
2016-08-01 18:10:59 +02:00
});
2016-12-11 16:17:51 +01:00
2017-01-04 02:10:27 +01:00
return $emitter->stream();
2016-08-01 18:10:59 +02:00
}
2016-05-24 18:47:14 +02:00
/**
2017-01-04 02:10:27 +01:00
* Returns a stream that emits a value every $interval milliseconds after (up to $count times). The value emitted
* is an integer of the number of times the stream emitted a value.
2016-05-24 18:47:14 +02:00
*
* @param int $interval Time interval between emitted values in milliseconds.
* @param int $count Number of values to emit. PHP_INT_MAX by default.
2016-05-24 18:47:14 +02:00
*
2017-01-04 02:10:27 +01:00
* @return \Amp\Stream
*
* @throws \Error If the number of times to emit is not a positive value.
2016-05-24 18:47:14 +02:00
*/
2017-01-04 02:10:27 +01:00
function interval(int $interval, int $count = PHP_INT_MAX): Stream {
if (0 >= $count) {
throw new \Error("The number of times to emit must be a positive value");
2016-05-24 18:47:14 +02:00
}
2017-01-04 02:10:27 +01:00
$emitter = new Emitter;
2016-05-27 01:20:05 +02:00
2017-01-04 02:10:27 +01:00
Loop::repeat($interval, function ($watcher) use (&$i, $emitter, $count) {
$emitter->emit(++$i);
2016-05-27 01:20:05 +02:00
if ($i === $count) {
2016-05-24 18:47:14 +02:00
Loop::cancel($watcher);
2017-01-04 02:10:27 +01:00
$emitter->resolve();
2016-05-24 18:47:14 +02:00
}
});
2016-05-27 01:20:05 +02:00
2017-01-04 02:10:27 +01:00
return $emitter->stream();
2016-05-24 18:47:14 +02:00
}