1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Refactor functions with Emitter

This commit is contained in:
Aaron Piotrowski 2016-06-02 10:43:46 -05:00
parent cef5c90168
commit 5715bec9e1

View File

@ -20,24 +20,23 @@ function merge(array $observables) {
}
}
$postponed = new Postponed;
return new Emitter(function (callable $emit) use ($observables) {
$subscriptions = [];
$subscriptions = [];
foreach ($observables as $observable) {
$subscriptions[] = $observable->subscribe([$postponed, 'emit']);
}
all($subscriptions)->when(function ($exception, $value) use ($postponed) {
if ($exception) {
$postponed->fail($exception);
return;
foreach ($observables as $observable) {
$subscriptions[] = $observable->subscribe($emit);
}
$postponed->resolve($value);
});
try {
$result = (yield all($subscriptions));
} finally {
foreach ($subscriptions as $subscription) {
$subscription->unsubscribe();
}
}
return $postponed->getObservable();
yield Coroutine::result($result);
});
}
/**
@ -90,23 +89,9 @@ function range($start, $end, $step = 1) {
throw new \InvalidArgumentException("Step is not of the correct sign");
}
$postponed = new Postponed;
$generator = function (Postponed $postponed, $start, $end, $step) {
return new Emitter(function (callable $emit) use ($start, $end, $step) {
for ($i = $start; $i <= $end; $i += $step) {
yield $postponed->emit($i);
yield $emit($i);
}
};
$coroutine = new Coroutine($generator($postponed, $start, $end, $step));
$coroutine->when(function ($exception) use ($postponed) {
if ($exception) {
$postponed->fail($exception);
return;
}
$postponed->resolve();
});
return $postponed->getObservable();
}