1
0
mirror of https://github.com/danog/amp.git synced 2024-12-04 18:38:17 +01:00

Fix pipeline functions

This commit is contained in:
Aaron Piotrowski 2020-10-02 13:55:58 -05:00
parent bf58e595be
commit d62ba46fb4
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB

View File

@ -59,10 +59,10 @@ namespace Amp
Loop::defer(static fn () => \Fiber::run(static function () use ($deferred, $callback, $args): void { Loop::defer(static fn () => \Fiber::run(static function () use ($deferred, $callback, $args): void {
try { try {
$deferred->resolve($callback(...$args)); $deferred->resolve($callback(...$args));
} catch (\Throwable $e) { } catch (\Throwable $exception) {
$deferred->fail($e); $deferred->fail($exception);
} }
}, ...$args)); }));
return $deferred->promise(); return $deferred->promise();
} }
@ -874,10 +874,10 @@ namespace Amp\Pipeline
use Amp\PipelineSource; use Amp\PipelineSource;
use Amp\Promise; use Amp\Promise;
use React\Promise\PromiseInterface as ReactPromise; use React\Promise\PromiseInterface as ReactPromise;
use function Amp\async;
use function Amp\asyncCallable;
use function Amp\await; use function Amp\await;
use function Amp\call; use function Amp\sleep;
use function Amp\coroutine;
use function Amp\delay;
use function Amp\Internal\createTypeError; use function Amp\Internal\createTypeError;
/** /**
@ -902,7 +902,7 @@ namespace Amp\Pipeline
return new AsyncGenerator(static function () use ($iterable, $delay): \Generator { return new AsyncGenerator(static function () use ($iterable, $delay): \Generator {
foreach ($iterable as $value) { foreach ($iterable as $value) {
if ($delay) { if ($delay) {
await(delay($delay)); sleep($delay);
} }
if ($value instanceof Promise || $value instanceof ReactPromise) { if ($value instanceof Promise || $value instanceof ReactPromise) {
@ -971,9 +971,9 @@ namespace Amp\Pipeline
$source = new PipelineSource; $source = new PipelineSource;
$result = $source->pipe(); $result = $source->pipe();
$coroutine = coroutine(static function (Pipeline $stream) use (&$source) { $coroutine = asyncCallable(static function (Pipeline $stream) use (&$source) {
while ((null !== $value = yield $stream->continue()) && $source !== null) { while ((null !== $value = $stream->continue()) && $source !== null) {
yield $source->emit($value); $source->yield($value);
} }
}); });
@ -1039,10 +1039,10 @@ namespace Amp\Pipeline
*/ */
function discard(Pipeline $pipeline): Promise function discard(Pipeline $pipeline): Promise
{ {
return call(static function () use ($stream): \Generator { return async(static function () use ($pipeline): int {
$count = 0; $count = 0;
while (null !== yield $stream->continue()) { while (null !== $pipeline->continue()) {
$count++; $count++;
} }