diff --git a/lib/AsyncGenerator.php b/lib/AsyncGenerator.php index c71f603..bf7e727 100644 --- a/lib/AsyncGenerator.php +++ b/lib/AsyncGenerator.php @@ -110,22 +110,6 @@ final class AsyncGenerator implements Stream $this->source->dispose(); } - /** - * @inheritDoc - */ - public function onCompletion(callable $onCompletion) - { - $this->source->onCompletion($onCompletion); - } - - /** - * @inheritDoc - */ - public function onDisposal(callable $onDisposal) - { - $this->source->onDisposal($onDisposal); - } - /** * @return Promise * diff --git a/lib/Internal/EmitSource.php b/lib/Internal/EmitSource.php index 1d65c34..90f1333 100644 --- a/lib/Internal/EmitSource.php +++ b/lib/Internal/EmitSource.php @@ -20,10 +20,10 @@ use React\Promise\PromiseInterface as ReactPromise; */ final class EmitSource { - /** @var \Throwable|null */ - private $exception; + /** @var Promise|null */ + private $result; - /** @var bool */ + /** @var bool True if complete() or fail() has been called. */ private $completed = false; /** @var mixed[] */ @@ -53,9 +53,6 @@ final class EmitSource /** @var bool */ private $used = false; - /** @var callable[]|null */ - private $onCompletion = []; - /** @var callable[]|null */ private $onDisposal = []; @@ -132,12 +129,8 @@ final class EmitSource return Promise\succeed($value); } - if ($this->exception) { - return Promise\fail($this->exception); - } - - if ($this->completed) { - return Promise\succeed(); + if ($this->result) { + return $this->result; } $this->waiting[$position] = $deferred = new Deferred; @@ -163,11 +156,11 @@ final class EmitSource */ public function dispose() { - if ($this->completed || $this->disposed) { + if ($this->result) { return; // Stream already completed or failed. } - $this->finalize(new DisposedException, true); + $this->finalize(Promise\fail(new DisposedException), true); } /** @@ -190,40 +183,13 @@ final class EmitSource return; } - if ($this->completed) { + if ($this->result) { return; } $this->onDisposal[] = $onDisposal; } - /** - * @param callable(?\Throwable):void $onDispose - * - * @return void - * - * @see Stream::onCompletion() - */ - public function onCompletion(callable $onCompletion) - { - if ($this->completed) { - try { - $onCompletion($this->exception); - } catch (\Throwable $e) { - Loop::defer(static function () use ($e) { - throw $e; - }); - } - return; - } - - if ($this->disposed) { - return; - } - - $this->onCompletion[] = $onCompletion; - } - /** * Emits a value from the stream. The returned promise is resolved once the emitted value has been consumed or * if the stream is completed, failed, or disposed. @@ -242,13 +208,12 @@ final class EmitSource */ public function emit($value): Promise { - if ($this->completed || $this->exception) { - if ($this->disposed) { - \assert($this->exception instanceof DisposedException); - return Promise\fail($this->exception); + if ($this->result) { + if ($this->completed) { + throw new \Error("Streams cannot emit values after calling complete"); } - throw new \Error("Streams cannot emit values after calling complete"); + return $this->result; } if ($value === null) { @@ -306,7 +271,7 @@ final class EmitSource */ public function complete() { - $this->finalize(); + $this->finalize(Promise\succeed()); } /** @@ -322,16 +287,16 @@ final class EmitSource throw new \Error("Cannot fail a stream with an instance of " . DisposedException::class); } - $this->finalize($exception); + $this->finalize(Promise\fail($exception)); } /** - * @param \Throwable $exception - * @param bool $disposed Flag if the generator was disposed. + * @param Promise $result + * @param bool $disposed Flag if the generator was disposed. * * @return void */ - private function finalize(\Throwable $exception = null, bool $disposed = false) + private function finalize(Promise $result, bool $disposed = false) { if ($this->completed) { $message = "Stream has already been completed"; @@ -349,9 +314,7 @@ final class EmitSource throw new \Error($message); } - $alreadyDisposed = $this->disposed; - - $this->completed = !$disposed; // $disposed is false if complete() or fail() invoked + $this->completed = $this->completed ?: !$disposed; // $disposed is false if complete() or fail() invoked $this->disposed = $this->disposed ?: $disposed; // Once disposed, do not change flag if ($this->completed) { // Record stack trace when calling complete() or fail() @@ -366,37 +329,19 @@ final class EmitSource })()); } - if ($alreadyDisposed) { + if ($this->result) { return; } - $this->exception = $exception; + $this->result = $result; $waiting = $this->waiting; $this->waiting = []; - $result = $exception ? Promise\fail($exception) : Promise\succeed(); - foreach ($waiting as $deferred) { $deferred->resolve($result); } - $onCompletion = $this->onCompletion; - $this->onCompletion = null; - - if (!$disposed) { - /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onCompletion being null */ - foreach ($onCompletion as $callback) { - try { - $callback($exception); - } catch (\Throwable $e) { - Loop::defer(static function () use ($e) { - throw $e; - }); - } - } - } - $onDisposal = $this->onDisposal; $this->onDisposal = null; diff --git a/lib/Stream.php b/lib/Stream.php index e17214d..3717862 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -27,24 +27,4 @@ interface Stream * @return void */ public function dispose(); - - /** - * Registers a callback to be invoked *only* if the stream is disposed before being completed or failed. - * - * @param callable():void $onDisposal - * - * @return void - */ - public function onDisposal(callable $onDisposal); - - /** - * Registers a callback to be invoked when the stream is completed or failed. If the stream is failed, the exception - * used to fail the stream is given as the first argument to the callback. Null is given as the first argument if - * the stream is completed. - * - * @param callable(?\Throwable):void $onCompletion - * - * @return void - */ - public function onCompletion(callable $onCompletion); } diff --git a/lib/StreamSource.php b/lib/StreamSource.php index 7d0662c..373f3c8 100644 --- a/lib/StreamSource.php +++ b/lib/StreamSource.php @@ -56,17 +56,6 @@ final class StreamSource return $this->source->isComplete(); } - /** - * @param callable(?\Throwable):void $onCompletion - * - * @return void - */ - public function onCompletion(callable $onCompletion) - { - $this->source->onCompletion($onCompletion); - } - - /** * @return bool True if the stream has been disposed. */ diff --git a/test/AsyncGeneratorTest.php b/test/AsyncGeneratorTest.php index 8856074..438d896 100644 --- a/test/AsyncGeneratorTest.php +++ b/test/AsyncGeneratorTest.php @@ -133,8 +133,6 @@ class AsyncGeneratorTest extends AsyncTestCase return $value; }); - $generator->onDisposal($this->createCallback(0)); - $generator->onCompletion($this->createCallback(1)); $this->assertSame(0, yield $generator->continue()); $this->assertNull(yield $generator->continue()); $this->assertSame($value, yield $generator->getReturn()); @@ -152,11 +150,6 @@ class AsyncGeneratorTest extends AsyncTestCase yield $yield(yield $deferred->promise()); }); - $generator->onDisposal($this->createCallback(0)); - $generator->onCompletion($this->createCallback(1, function (\Throwable $reason = null) use ($exception) { - $this->assertSame($exception, $reason); - })); - $deferred->fail($exception); try { @@ -231,9 +224,6 @@ class AsyncGeneratorTest extends AsyncTestCase yield $generator->continue(); - $generator->onDisposal($this->createCallback(1)); - $generator->onCompletion($this->createCallback(0)); - unset($generator); // Should call dispose() on the internal stream. $this->assertInstanceOf(DisposedException::class, $exception); @@ -256,13 +246,8 @@ class AsyncGeneratorTest extends AsyncTestCase yield $generator->continue(); - $generator->onDisposal($this->createCallback(1)); - $generator->onCompletion($this->createCallback(0)); - $generator->dispose(); - $generator->onDisposal($this->createCallback(1)); - $this->expectException(DisposedException::class); yield $generator->getReturn(); @@ -281,8 +266,5 @@ class AsyncGeneratorTest extends AsyncTestCase $this->assertSame(0, yield $generator->continue()); $this->assertTrue($invoked); - - $generator->onDisposal($this->createCallback(1)); - $generator->onCompletion($this->createCallback(0)); } } diff --git a/test/StreamSourceTest.php b/test/StreamSourceTest.php index c7eb38b..b00dd7f 100644 --- a/test/StreamSourceTest.php +++ b/test/StreamSourceTest.php @@ -249,10 +249,8 @@ class StreamSourceTest extends AsyncTestCase $stream = $this->source->stream(); $promise = $this->source->emit(1); $this->source->onDisposal($this->createCallback(1)); - $this->source->onCompletion($this->createCallback(0)); $stream->dispose(); $this->source->onDisposal($this->createCallback(1)); - $this->source->onCompletion($this->createCallback(0)); $this->assertTrue($this->source->isDisposed()); $this->assertNull(yield $promise); yield $this->source->emit(1); @@ -267,58 +265,13 @@ class StreamSourceTest extends AsyncTestCase $stream = $this->source->stream(); $promise = $this->source->emit(1); $this->source->onDisposal($this->createCallback(1)); - $this->source->onCompletion($this->createCallback(0)); unset($stream); $this->source->onDisposal($this->createCallback(1)); - $this->source->onCompletion($this->createCallback(0)); $this->assertTrue($this->source->isDisposed()); $this->assertNull(yield $promise); yield $this->source->emit(1); } - public function testOnCompletionWithSuccessfulStream() - { - $invoked = false; - $this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked) { - $this->assertNull($exception); - $invoked = true; - }); - - $this->source->onDisposal($this->createCallback(0)); - - $this->assertFalse($invoked); - - $this->source->complete(); - - $this->assertTrue($invoked); - - $this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) { - $this->assertNull($exception); - })); - } - - public function testOnCompletionWithFailedStream() - { - $reason = new \Exception; - $invoked = false; - $this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked, $reason) { - $this->assertSame($reason, $exception); - $invoked = true; - }); - - $this->source->onDisposal($this->createCallback(0)); - - $this->assertFalse($invoked); - - $this->source->fail($reason); - - $this->assertTrue($invoked); - - $this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) use ($reason) { - $this->assertSame($reason, $exception); - })); - } - public function testFailWithDisposedException() { $this->expectException(\Error::class);