mirror of
https://github.com/danog/amp.git
synced 2024-11-26 20:15:00 +01:00
Simplify Iterator\concat implementation
This commit is contained in:
parent
3fcc4d5de7
commit
d81cc35902
@ -571,49 +571,27 @@ namespace Amp\Iterator
|
||||
}
|
||||
}
|
||||
|
||||
$emitter = new Emitter;
|
||||
$previous = [];
|
||||
$promise = Promise\all($previous);
|
||||
|
||||
$coroutine = coroutine(function (Iterator $iterator, callable $emit) {
|
||||
return new Producer(function (callable $emit) use ($iterators) {
|
||||
$exception = null;
|
||||
foreach ($iterators as $iterator) {
|
||||
try {
|
||||
while (yield $iterator->advance()) {
|
||||
if ($exception !== null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
yield $emit($iterator->getCurrent());
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
if ($exception === null) {
|
||||
$exception = $e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ($exception !== null) {
|
||||
throw $exception;
|
||||
}
|
||||
});
|
||||
|
||||
foreach ($iterators as $iterator) {
|
||||
$emit = coroutine(function ($value) use ($emitter, $promise) {
|
||||
static $pending = true, $failed = false;
|
||||
|
||||
if ($failed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ($pending) {
|
||||
try {
|
||||
yield $promise;
|
||||
$pending = false;
|
||||
} catch (\Throwable $exception) {
|
||||
$failed = true;
|
||||
return; // Prior iterator failed.
|
||||
}
|
||||
}
|
||||
|
||||
yield $emitter->emit($value);
|
||||
});
|
||||
$previous[] = $coroutine($iterator, $emit);
|
||||
$promise = Promise\all($previous);
|
||||
}
|
||||
|
||||
$promise->onResolve(function ($exception) use ($emitter) {
|
||||
if ($exception) {
|
||||
$emitter->fail($exception);
|
||||
return;
|
||||
}
|
||||
|
||||
$emitter->complete();
|
||||
});
|
||||
|
||||
return $emitter->iterate();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user