1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00
amp/lib/functions.php

98 lines
2.6 KiB
PHP
Raw Normal View History

2016-05-24 18:47:14 +02:00
<?php
namespace Amp;
use Interop\Async\Loop;
/**
* Creates an observable that emits values emitted from any observable in the array of observables. Values in the
* array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables,
* or any other value.
*
* @param \Amp\Observable[] $observables
*
* @return \Amp\Observable
*/
function merge(array $observables) {
foreach ($observables as $observable) {
if (!$observable instanceof Observable) {
throw new \InvalidArgumentException("Non-observable provided");
}
}
2016-06-02 17:43:46 +02:00
return new Emitter(function (callable $emit) use ($observables) {
$subscriptions = [];
2016-05-24 18:47:14 +02:00
2016-06-02 17:43:46 +02:00
foreach ($observables as $observable) {
$subscriptions[] = $observable->subscribe($emit);
}
2016-05-27 01:20:05 +02:00
2016-06-02 17:43:46 +02:00
try {
$result = (yield all($subscriptions));
} finally {
foreach ($subscriptions as $subscription) {
$subscription->unsubscribe();
}
2016-05-24 18:47:14 +02:00
}
2016-06-02 17:43:46 +02:00
yield Coroutine::result($result);
2016-05-24 18:47:14 +02:00
});
}
/**
* Returns an observable that emits a value every $interval milliseconds after the previous value has been consumed
* (up to $count times (or indefinitely if $count is 0). The value emitted is an integer of the number of times the
* observable emitted a value.
*
* @param int $interval Time interval between emitted values in milliseconds.
* @param int $count Use 0 to emit values indefinitely.
*
* @return \Amp\Observable
*/
function interval($interval, $count = 0) {
$count = (int) $count;
if (0 > $count) {
throw new \InvalidArgumentException("The number of times to emit must be a non-negative value");
}
2016-05-27 01:20:05 +02:00
$postponed = new Postponed;
Loop::repeat($interval, function ($watcher) use (&$i, $postponed, $count) {
$postponed->emit(++$i);
if ($i === $count) {
2016-05-24 18:47:14 +02:00
Loop::cancel($watcher);
2016-05-29 18:35:09 +02:00
$postponed->resolve();
2016-05-24 18:47:14 +02:00
}
});
2016-05-27 01:20:05 +02:00
return $postponed->getObservable();
2016-05-24 18:47:14 +02:00
}
/**
* @param int $start
* @param int $end
* @param int $step
*
* @return \Amp\Observable
*/
function range($start, $end, $step = 1) {
$start = (int) $start;
$end = (int) $end;
$step = (int) $step;
if (0 === $step) {
throw new \InvalidArgumentException("Step must be a non-zero integer");
}
if ((($end - $start) ^ $step) < 0) {
throw new \InvalidArgumentException("Step is not of the correct sign");
}
2016-06-02 17:43:46 +02:00
return new Emitter(function (callable $emit) use ($start, $end, $step) {
2016-05-24 18:47:14 +02:00
for ($i = $start; $i <= $end; $i += $step) {
2016-06-02 17:43:46 +02:00
yield $emit($i);
2016-05-27 01:20:05 +02:00
}
2016-05-24 18:47:14 +02:00
});
}