1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 05:11:42 +01:00

Add more functions

This commit is contained in:
Aaron Piotrowski 2016-07-18 23:23:25 -05:00
parent 808ce32e3c
commit b9d554dd7b

View File

@ -2,8 +2,48 @@
namespace Amp;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
/**
* @param \Amp\Observable $observable
* @param callable(mixed $value): mixed $onNext
* @param callable(mixed $value): mixed|null $onComplete
*
* @return \Amp\Observable
*/
function each(Observable $observable, callable $onNext, callable $onComplete = null) {
return new Emitter(function (callable $emit) use ($observable, $onNext, $onComplete) {
$result = (yield $observable->subscribe(function ($value) use ($emit, $onNext) {
return $emit($onNext($value));
}));
if ($onComplete === null) {
yield Coroutine::result($result);
return;
}
yield Coroutine::result($onComplete($result));
});
}
/**
* @param \Amp\Observable $observable
* @param callable(mixed $value): bool $filter
*
* @return \Amp\Observable
*/
function filter(Observable $observable, callable $filter) {
return new Emitter(function (callable $emit) use ($observable, $filter) {
yield Coroutine::result(yield $observable->subscribe(function ($value) use ($emit, $filter) {
if (!$filter($value)) {
return null;
}
return $emit($value);
}));
});
}
/**
* Creates an observable that emits values emitted from any observable in the array of observables. Values in the
* array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables,
@ -39,6 +79,53 @@ function merge(array $observables) {
});
}
/**
* Creates an observable from the given array of observables, emitting the success value of each provided awaitable or
* failing if any awaitable fails.
*
* @param \Interop\Async\Awaitable[] $awaitables
*
* @return \Amp\Observable
*/
function stream(array $awaitables) {
$postponed = new Postponed;
if (empty($awaitables)) {
$postponed->complete();
return $postponed;
}
$pending = \count($awaitables);
$onResolved = function ($exception, $value) use (&$pending, $postponed) {
if ($pending <= 0) {
return;
}
if ($exception) {
$pending = 0;
$postponed->fail($exception);
return;
}
$postponed->emit($value);
if (--$pending === 0) {
$postponed->complete();
}
};
foreach ($awaitables as $awaitable) {
if (!$awaitable instanceof Awaitable) {
throw new \InvalidArgumentException("Non-awaitable provided");
}
$awaitable->when($onResolved);
}
return $postponed;
}
/**
* Returns an observable that emits a value every $interval milliseconds after the previous value has been consumed
* (up to $count times (or indefinitely if $count is 0). The value emitted is an integer of the number of times the