diff --git a/lib/Internal/EmitSource.php b/lib/Internal/EmitSource.php index 97e3d14..1a03aa9 100644 --- a/lib/Internal/EmitSource.php +++ b/lib/Internal/EmitSource.php @@ -208,14 +208,6 @@ final class EmitSource */ public function emit($value): Promise { - if ($this->result) { - if ($this->completed) { - throw new \Error("Pipelines cannot emit values after calling complete"); - } - - return $this->result; - } - if ($value === null) { throw new \TypeError("Pipelines cannot emit NULL"); } @@ -237,10 +229,24 @@ final class EmitSource unset($this->sendValues[$position]); return $promise; } + } elseif ($this->result) { + if ($this->completed) { + throw new \Error("Pipelines cannot emit values after calling complete"); + } + + return $this->result; } else { $this->emittedValues[$position] = $value; } + if ($this->disposed) { + if (empty($this->waiting)) { + $this->triggerDisposal(); + } + + return Promise\succeed(); + } + $this->backPressure[$position] = $deferred = new Deferred; return $deferred->promise(); @@ -259,7 +265,7 @@ final class EmitSource */ public function isDisposed(): bool { - return $this->disposed; + return $this->disposed && empty($this->waiting); } /** @@ -335,33 +341,44 @@ final class EmitSource $this->result = $result; - $waiting = $this->waiting; - $this->waiting = []; + if ($disposed) { + if (empty($this->waiting)) { + $this->triggerDisposal(); + } + } else { + $waiting = $this->waiting; + $this->waiting = []; - foreach ($waiting as $deferred) { - $deferred->resolve($result); + foreach ($waiting as $deferred) { + $deferred->resolve($result); + } + } + } + + private function triggerDisposal() + { + if ($this->onDisposal === null) { + return; } $onDisposal = $this->onDisposal; $this->onDisposal = null; - if ($disposed) { - $backPressure = $this->backPressure; - $this->backPressure = []; + $backPressure = $this->backPressure; + $this->backPressure = []; - foreach ($backPressure as $deferred) { - $deferred->resolve($result); - } + foreach ($backPressure as $deferred) { + $deferred->resolve($this->result); + } - /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */ - foreach ($onDisposal as $callback) { - try { - $callback(); - } catch (\Throwable $e) { - Loop::defer(static function () use ($e) { - throw $e; - }); - } + /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */ + foreach ($onDisposal as $callback) { + try { + $callback(); + } catch (\Throwable $e) { + Loop::defer(static function () use ($e) { + throw $e; + }); } } } diff --git a/test/PipelineSourceTest.php b/test/PipelineSourceTest.php index dd10e31..f9d64af 100644 --- a/test/PipelineSourceTest.php +++ b/test/PipelineSourceTest.php @@ -256,6 +256,23 @@ class PipelineSourceTest extends AsyncTestCase yield $this->source->emit(1); } + public function testEmitAfterDisposalWithPendingContinuePromise() + { + $pipeline = $this->source->pipe(); + $promise = $pipeline->continue(); + $this->source->onDisposal($this->createCallback(1)); + $pipeline->dispose(); + $this->source->onDisposal($this->createCallback(1)); + $this->assertFalse($this->source->isDisposed()); + yield $this->source->emit(1); + $this->assertSame(1, yield $promise); + + $this->expectException(DisposedException::class); + $this->expectExceptionMessage('The pipeline has been disposed'); + + $this->assertTrue($this->source->isDisposed()); + yield $this->source->emit(2); + } public function testEmitAfterDestruct() {