From f4cc5919888ddd2418e47a1497c46404ac21df92 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 11 Jul 2020 09:31:35 -0500 Subject: [PATCH] Feedback update --- lib/Internal/EmitSource.php | 5 +- lib/functions.php | 116 +++++++----------------------------- 2 files changed, 25 insertions(+), 96 deletions(-) diff --git a/lib/Internal/EmitSource.php b/lib/Internal/EmitSource.php index 1b59c76..cab4cee 100644 --- a/lib/Internal/EmitSource.php +++ b/lib/Internal/EmitSource.php @@ -21,6 +21,9 @@ use React\Promise\PromiseInterface as ReactPromise; */ final class EmitSource { + /** @var Success */ + private static $success; + /** @var Promise|null */ private $result; @@ -61,7 +64,7 @@ final class EmitSource */ public function continue(): Promise { - return $this->next(new Success); + return $this->next(self::$success ?? (self::$success = new Success)); } /** diff --git a/lib/functions.php b/lib/functions.php index 4eccf40..db09b5c 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -714,50 +714,13 @@ namespace Amp\Iterator } } - $emitter = new Emitter; - $previous = []; - $promise = Promise\all($previous); - - $coroutine = coroutine(static function (Iterator $iterator, callable $emit) { - while (yield $iterator->advance()) { - yield $emit($iterator->getCurrent()); + return new Producer(function (callable $emit) use ($iterators) { + foreach ($iterators as $iterator) { + while (yield $iterator->advance()) { + yield $emit($iterator->getCurrent()); + } } }); - - foreach ($iterators as $iterator) { - $emit = coroutine(static function ($value) use ($emitter, $promise) { - static $pending = true, $failed = false; - - if ($failed) { - return; - } - - if ($pending) { - try { - yield $promise; - $pending = false; - } catch (\Throwable $exception) { - $failed = true; - return; // Prior iterator failed. - } - } - - yield $emitter->emit($value); - }); - $previous[] = $coroutine($iterator, $emit); - $promise = Promise\all($previous); - } - - $promise->onResolve(static function ($exception) use ($emitter) { - if ($exception) { - $emitter->fail($exception); - return; - } - - $emitter->complete(); - }); - - return $emitter->iterate(); } /** @@ -840,6 +803,7 @@ namespace Amp\Stream use Amp\Promise; use Amp\Stream; use Amp\StreamSource; + use React\Promise\PromiseInterface as ReactPromise; use function Amp\call; use function Amp\coroutine; use function Amp\Internal\createTypeError; @@ -869,18 +833,17 @@ namespace Amp\Stream throw createTypeError(["array", "Traversable"], $iterable); } - if ($delay) { - return new AsyncGenerator(static function (callable $yield) use ($iterable, $delay) { - foreach ($iterable as $value) { - yield new Delayed($delay); - yield $yield($value instanceof Promise ? yield $value : $value); - } - }); - } - - return new AsyncGenerator(static function (callable $yield) use ($iterable) { + return new AsyncGenerator(static function (callable $yield) use ($iterable, $delay) { foreach ($iterable as $value) { - yield $yield($value instanceof Promise ? yield $value : $value); + if ($delay) { + yield new Delayed($delay); + } + + if ($value instanceof Promise || $value instanceof ReactPromise) { + $value = yield $value; + } + + yield $yield($value); } }); } @@ -988,50 +951,13 @@ namespace Amp\Stream } } - $source = new StreamSource; - $previous = []; - $promise = Promise\all($previous); - - $coroutine = coroutine(static function (Stream $stream, callable $yield) { - while (null !== $value = yield $stream->continue()) { - yield $yield($value); + return new AsyncGenerator(function (callable $emit) use ($streams) { + foreach ($streams as $stream) { + while ($value = yield $stream->continue()) { + yield $emit($value); + } } }); - - foreach ($streams as $iterator) { - $emit = coroutine(static function ($value) use ($source, $promise) { - static $pending = true, $failed = false; - - if ($failed) { - return; - } - - if ($pending) { - try { - yield $promise; - $pending = false; - } catch (\Throwable $exception) { - $failed = true; - return; // Prior iterator failed. - } - } - - yield $source->emit($value); - }); - $previous[] = $coroutine($iterator, $emit); - $promise = Promise\all($previous); - } - - $promise->onResolve(static function ($exception) use ($source) { - if ($exception) { - $source->fail($exception); - return; - } - - $source->complete(); - }); - - return $source->stream(); } /**