From d62ba46fb476ffacd7d57b9776237e36bfc5d1d4 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 2 Oct 2020 13:55:58 -0500 Subject: [PATCH] Fix pipeline functions --- lib/functions.php | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/functions.php b/lib/functions.php index 0fde36c..3a6a228 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -59,10 +59,10 @@ namespace Amp Loop::defer(static fn () => \Fiber::run(static function () use ($deferred, $callback, $args): void { try { $deferred->resolve($callback(...$args)); - } catch (\Throwable $e) { - $deferred->fail($e); + } catch (\Throwable $exception) { + $deferred->fail($exception); } - }, ...$args)); + })); return $deferred->promise(); } @@ -874,10 +874,10 @@ namespace Amp\Pipeline use Amp\PipelineSource; use Amp\Promise; use React\Promise\PromiseInterface as ReactPromise; + use function Amp\async; + use function Amp\asyncCallable; use function Amp\await; - use function Amp\call; - use function Amp\coroutine; - use function Amp\delay; + use function Amp\sleep; use function Amp\Internal\createTypeError; /** @@ -902,7 +902,7 @@ namespace Amp\Pipeline return new AsyncGenerator(static function () use ($iterable, $delay): \Generator { foreach ($iterable as $value) { if ($delay) { - await(delay($delay)); + sleep($delay); } if ($value instanceof Promise || $value instanceof ReactPromise) { @@ -971,9 +971,9 @@ namespace Amp\Pipeline $source = new PipelineSource; $result = $source->pipe(); - $coroutine = coroutine(static function (Pipeline $stream) use (&$source) { - while ((null !== $value = yield $stream->continue()) && $source !== null) { - yield $source->emit($value); + $coroutine = asyncCallable(static function (Pipeline $stream) use (&$source) { + while ((null !== $value = $stream->continue()) && $source !== null) { + $source->yield($value); } }); @@ -1039,10 +1039,10 @@ namespace Amp\Pipeline */ function discard(Pipeline $pipeline): Promise { - return call(static function () use ($stream): \Generator { + return async(static function () use ($pipeline): int { $count = 0; - while (null !== yield $stream->continue()) { + while (null !== $pipeline->continue()) { $count++; }