From 865238bc1643cd6a02e9fcc67e1216296681c0eb Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 3 Nov 2020 16:55:29 -0600 Subject: [PATCH] Use fiber directly when emitting values --- lib/Internal/EmitSource.php | 96 ++++++++++++++++++------------------- test/PipelineSourceTest.php | 4 ++ 2 files changed, 52 insertions(+), 48 deletions(-) diff --git a/lib/Internal/EmitSource.php b/lib/Internal/EmitSource.php index e45ef89..298cb8e 100644 --- a/lib/Internal/EmitSource.php +++ b/lib/Internal/EmitSource.php @@ -9,7 +9,7 @@ use Amp\Loop; use Amp\Pipeline; use Amp\Promise; use Amp\Success; -use function Amp\await; +use function Amp\defer; /** * Class used internally by {@see Pipeline} implementations. Do not use this class in your code, instead compose your @@ -22,16 +22,20 @@ use function Amp\await; */ final class EmitSource { - private ?Promise $result = null; - private bool $completed = false; + private \Throwable $exception; + + /** @var mixed[] */ private array $emittedValues = []; + /** @var Promise[] */ private array $sendValues = []; + /** @var Deferred[] */ private array $backPressure = []; + /** @var \Fiber[] */ private array $waiting = []; private int $consumePosition = 0; @@ -44,6 +48,7 @@ final class EmitSource private bool $used = false; + /** @var callable[]|null */ private ?array $onDisposal = []; /** @@ -101,18 +106,17 @@ final class EmitSource if (\array_key_exists($position, $this->emittedValues)) { $value = $this->emittedValues[$position]; unset($this->emittedValues[$position]); - - // Defer next value to avoid creating a blocking loop. - return await(new Success($value)); + return $value; } - if ($this->result) { - return await($this->result); + if ($this->completed || $this->disposed) { + if (isset($this->exception)) { + throw $this->exception; + } + return null; } - $this->waiting[$position] = $deferred = new Deferred; - - return await($deferred->promise()); + return \Fiber::suspend(fn(\Fiber $fiber) => $this->waiting[$position] = $fiber, Loop::get()); } public function pipe(): Pipeline @@ -144,11 +148,11 @@ final class EmitSource private function cancel(bool $cancelPending): void { try { - if ($this->result) { + if ($this->completed || $this->disposed) { return; // Pipeline already completed or failed. } - $this->finalize(new Failure(new DisposedException), true); + $this->finalize(new DisposedException, true); } finally { if ($this->disposed && $cancelPending) { $this->triggerDisposal(); @@ -166,17 +170,11 @@ final class EmitSource public function onDisposal(callable $onDisposal): void { if ($this->disposed) { - try { - $onDisposal(); - } catch (\Throwable $e) { - Loop::defer(static function () use ($e) { - throw $e; - }); - } + defer($onDisposal); return; } - if ($this->result) { + if ($this->completed) { return; } @@ -212,9 +210,9 @@ final class EmitSource $position = $this->emitPosition++; if (isset($this->waiting[$position])) { - $deferred = $this->waiting[$position]; + $fiber = $this->waiting[$position]; unset($this->waiting[$position]); - $deferred->resolve($value); + Loop::defer(fn() => $fiber->resume($value)); // Send-values are indexed as $this->consumePosition - 1, so use $position for the next value. if (isset($this->sendValues[$position])) { @@ -222,12 +220,10 @@ final class EmitSource unset($this->sendValues[$position]); return $promise; } - } elseif ($this->result) { - if ($this->completed) { - throw new \Error("Pipelines cannot emit values after calling complete"); - } - - return $this->result; + } elseif ($this->completed) { + throw new \Error("Pipelines cannot emit values after calling complete"); + } elseif (isset($this->exception)) { + return new Failure($this->exception); } else { $this->emittedValues[$position] = $value; } @@ -270,7 +266,7 @@ final class EmitSource */ public function complete(): void { - $this->finalize(new Success); + $this->finalize(); } /** @@ -286,16 +282,16 @@ final class EmitSource throw new \Error("Cannot fail a pipeline with an instance of " . DisposedException::class); } - $this->finalize(new Failure($exception)); + $this->finalize($exception); } /** - * @param Promise $result - * @param bool $disposed Flag if the generator was disposed. + * @param \Throwable|null $exception Failure reason or null for success. + * @param bool $disposed Flag if the generator was disposed. * * @return void */ - private function finalize(Promise $result, bool $disposed = false): void + private function finalize(?\Throwable $exception = null, bool $disposed = false): void { if ($this->completed) { $message = "Pipeline has already been completed"; @@ -328,18 +324,20 @@ final class EmitSource })()); } - if ($this->result) { + if (isset($this->exception)) { return; } - $this->result = $result; + if ($exception !== null) { + $this->exception = $exception; + } if ($this->disposed) { if (empty($this->waiting)) { $this->triggerDisposal(); } } else { - $this->resolvePending(); + Loop::defer(fn() => $this->resolvePending()); } } @@ -355,11 +353,19 @@ final class EmitSource $this->waiting = []; foreach ($backPressure as $deferred) { - $deferred->resolve($this->result); + if (isset($this->exception)) { + $deferred->fail($this->exception); + } else { + $deferred->resolve(); + } } - foreach ($waiting as $deferred) { - $deferred->resolve($this->result); + foreach ($waiting as $fiber) { + if (isset($this->exception)) { + $fiber->throw($this->exception); + } else { + $fiber->resume(); + } } } @@ -377,17 +383,11 @@ final class EmitSource $onDisposal = $this->onDisposal; $this->onDisposal = null; - $this->resolvePending(); + Loop::defer(fn() => $this->resolvePending()); /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */ foreach ($onDisposal as $callback) { - try { - $callback(); - } catch (\Throwable $e) { - Loop::defer(static function () use ($e) { - throw $e; - }); - } + defer($callback); } } } diff --git a/test/PipelineSourceTest.php b/test/PipelineSourceTest.php index 003f964..2ee3843 100644 --- a/test/PipelineSourceTest.php +++ b/test/PipelineSourceTest.php @@ -217,6 +217,8 @@ class PipelineSourceTest extends AsyncTestCase $pipeline = $this->source->pipe(); $pipeline->dispose(); + delay(0); + $this->assertTrue($invoked); $this->source->onDisposal($this->createCallback(1)); @@ -239,6 +241,8 @@ class PipelineSourceTest extends AsyncTestCase $this->assertFalse($invoked); $this->source->onDisposal($this->createCallback(0)); + + delay(0); } public function testEmitAfterDisposal(): void