1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 17:37:50 +01:00

Feedback update

This commit is contained in:
Aaron Piotrowski 2020-07-11 09:31:35 -05:00
parent b64b3affc9
commit f4cc591988
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 25 additions and 96 deletions

View File

@ -21,6 +21,9 @@ use React\Promise\PromiseInterface as ReactPromise;
*/ */
final class EmitSource final class EmitSource
{ {
/** @var Success */
private static $success;
/** @var Promise|null */ /** @var Promise|null */
private $result; private $result;
@ -61,7 +64,7 @@ final class EmitSource
*/ */
public function continue(): Promise public function continue(): Promise
{ {
return $this->next(new Success); return $this->next(self::$success ?? (self::$success = new Success));
} }
/** /**

View File

@ -714,50 +714,13 @@ namespace Amp\Iterator
} }
} }
$emitter = new Emitter; return new Producer(function (callable $emit) use ($iterators) {
$previous = []; foreach ($iterators as $iterator) {
$promise = Promise\all($previous);
$coroutine = coroutine(static function (Iterator $iterator, callable $emit) {
while (yield $iterator->advance()) { while (yield $iterator->advance()) {
yield $emit($iterator->getCurrent()); yield $emit($iterator->getCurrent());
} }
}
}); });
foreach ($iterators as $iterator) {
$emit = coroutine(static function ($value) use ($emitter, $promise) {
static $pending = true, $failed = false;
if ($failed) {
return;
}
if ($pending) {
try {
yield $promise;
$pending = false;
} catch (\Throwable $exception) {
$failed = true;
return; // Prior iterator failed.
}
}
yield $emitter->emit($value);
});
$previous[] = $coroutine($iterator, $emit);
$promise = Promise\all($previous);
}
$promise->onResolve(static function ($exception) use ($emitter) {
if ($exception) {
$emitter->fail($exception);
return;
}
$emitter->complete();
});
return $emitter->iterate();
} }
/** /**
@ -840,6 +803,7 @@ namespace Amp\Stream
use Amp\Promise; use Amp\Promise;
use Amp\Stream; use Amp\Stream;
use Amp\StreamSource; use Amp\StreamSource;
use React\Promise\PromiseInterface as ReactPromise;
use function Amp\call; use function Amp\call;
use function Amp\coroutine; use function Amp\coroutine;
use function Amp\Internal\createTypeError; use function Amp\Internal\createTypeError;
@ -869,18 +833,17 @@ namespace Amp\Stream
throw createTypeError(["array", "Traversable"], $iterable); throw createTypeError(["array", "Traversable"], $iterable);
} }
if ($delay) {
return new AsyncGenerator(static function (callable $yield) use ($iterable, $delay) { return new AsyncGenerator(static function (callable $yield) use ($iterable, $delay) {
foreach ($iterable as $value) { foreach ($iterable as $value) {
if ($delay) {
yield new Delayed($delay); yield new Delayed($delay);
yield $yield($value instanceof Promise ? yield $value : $value);
}
});
} }
return new AsyncGenerator(static function (callable $yield) use ($iterable) { if ($value instanceof Promise || $value instanceof ReactPromise) {
foreach ($iterable as $value) { $value = yield $value;
yield $yield($value instanceof Promise ? yield $value : $value); }
yield $yield($value);
} }
}); });
} }
@ -988,50 +951,13 @@ namespace Amp\Stream
} }
} }
$source = new StreamSource; return new AsyncGenerator(function (callable $emit) use ($streams) {
$previous = []; foreach ($streams as $stream) {
$promise = Promise\all($previous); while ($value = yield $stream->continue()) {
yield $emit($value);
$coroutine = coroutine(static function (Stream $stream, callable $yield) { }
while (null !== $value = yield $stream->continue()) {
yield $yield($value);
} }
}); });
foreach ($streams as $iterator) {
$emit = coroutine(static function ($value) use ($source, $promise) {
static $pending = true, $failed = false;
if ($failed) {
return;
}
if ($pending) {
try {
yield $promise;
$pending = false;
} catch (\Throwable $exception) {
$failed = true;
return; // Prior iterator failed.
}
}
yield $source->emit($value);
});
$previous[] = $coroutine($iterator, $emit);
$promise = Promise\all($previous);
}
$promise->onResolve(static function ($exception) use ($source) {
if ($exception) {
$source->fail($exception);
return;
}
$source->complete();
});
return $source->stream();
} }
/** /**