diff --git a/lib/AsyncGenerator.php b/lib/AsyncGenerator.php index 2fb99b4..c71f603 100644 --- a/lib/AsyncGenerator.php +++ b/lib/AsyncGenerator.php @@ -110,6 +110,22 @@ final class AsyncGenerator implements Stream $this->source->dispose(); } + /** + * @inheritDoc + */ + public function onCompletion(callable $onCompletion) + { + $this->source->onCompletion($onCompletion); + } + + /** + * @inheritDoc + */ + public function onDisposal(callable $onDisposal) + { + $this->source->onDisposal($onDisposal); + } + /** * @return Promise * @@ -127,7 +143,7 @@ final class AsyncGenerator implements Stream $source = $this->source; $this->coroutine->onResolve(static function ($exception) use ($source) { - if ($source->isComplete()) { + if ($source->isDisposed()) { return; // AsyncGenerator object was destroyed. } diff --git a/lib/Internal/AutoDisposingStream.php b/lib/Internal/AutoDisposingStream.php index 4b8f5ee..a7561cf 100644 --- a/lib/Internal/AutoDisposingStream.php +++ b/lib/Internal/AutoDisposingStream.php @@ -44,4 +44,20 @@ final class AutoDisposingStream implements Stream { $this->source->dispose(); } + + /** + * @inheritDoc + */ + public function onCompletion(callable $onCompletion) + { + $this->source->onCompletion($onCompletion); + } + + /** + * @inheritDoc + */ + public function onDisposal(callable $onDisposal) + { + $this->source->onDisposal($onDisposal); + } } diff --git a/lib/Internal/EmitSource.php b/lib/Internal/EmitSource.php index 01e883c..1d65c34 100644 --- a/lib/Internal/EmitSource.php +++ b/lib/Internal/EmitSource.php @@ -20,8 +20,8 @@ use React\Promise\PromiseInterface as ReactPromise; */ final class EmitSource { - /** @var Promise|null */ - private $result; + /** @var \Throwable|null */ + private $exception; /** @var bool */ private $completed = false; @@ -53,6 +53,9 @@ final class EmitSource /** @var bool */ private $used = false; + /** @var callable[]|null */ + private $onCompletion = []; + /** @var callable[]|null */ private $onDisposal = []; @@ -129,8 +132,12 @@ final class EmitSource return Promise\succeed($value); } - if ($this->result) { - return $this->result; + if ($this->exception) { + return Promise\fail($this->exception); + } + + if ($this->completed) { + return Promise\succeed(); } $this->waiting[$position] = $deferred = new Deferred; @@ -150,44 +157,73 @@ final class EmitSource } /** - * @see Stream::dispose() - * * @return void + * + * @see Stream::dispose() */ public function dispose() { - if ($this->result) { + if ($this->completed || $this->disposed) { return; // Stream already completed or failed. } - $this->finalize(Promise\fail(new DisposedException), true); + $this->finalize(new DisposedException, true); } /** - * @see Stream::onDisposal() - * * @param callable():void $onDispose * * @return void + * + * @see Stream::onDisposal() */ public function onDisposal(callable $onDisposal) { - if ($this->result) { - if ($this->disposed) { - try { - $onDisposal(); - } catch (\Throwable $e) { - Loop::defer(static function () use ($e) { - throw $e; - }); - } + if ($this->disposed) { + try { + $onDisposal(); + } catch (\Throwable $e) { + Loop::defer(static function () use ($e) { + throw $e; + }); } return; } + if ($this->completed) { + return; + } + $this->onDisposal[] = $onDisposal; } + /** + * @param callable(?\Throwable):void $onDispose + * + * @return void + * + * @see Stream::onCompletion() + */ + public function onCompletion(callable $onCompletion) + { + if ($this->completed) { + try { + $onCompletion($this->exception); + } catch (\Throwable $e) { + Loop::defer(static function () use ($e) { + throw $e; + }); + } + return; + } + + if ($this->disposed) { + return; + } + + $this->onCompletion[] = $onCompletion; + } + /** * Emits a value from the stream. The returned promise is resolved once the emitted value has been consumed or * if the stream is completed, failed, or disposed. @@ -206,9 +242,10 @@ final class EmitSource */ public function emit($value): Promise { - if ($this->result) { + if ($this->completed || $this->exception) { if ($this->disposed) { - return $this->result; // Promise failed with an instance of DisposedException. + \assert($this->exception instanceof DisposedException); + return Promise\fail($this->exception); } throw new \Error("Streams cannot emit values after calling complete"); @@ -262,14 +299,14 @@ final class EmitSource /** * Completes the stream. - ** + * * @return void * * @throws \Error If the iterator has already been completed. */ public function complete() { - $this->finalize(Promise\succeed()); + $this->finalize(); } /** @@ -281,16 +318,20 @@ final class EmitSource */ public function fail(\Throwable $exception) { - $this->finalize(Promise\fail($exception)); + if ($exception instanceof DisposedException) { + throw new \Error("Cannot fail a stream with an instance of " . DisposedException::class); + } + + $this->finalize($exception); } /** - * @param Promise $result Promise with the generator result, either a null success or a failed promise. - * @param bool $disposed Flag if the generator was disposed. + * @param \Throwable $exception + * @param bool $disposed Flag if the generator was disposed. * * @return void */ - private function finalize(Promise $result, bool $disposed = false) + private function finalize(\Throwable $exception = null, bool $disposed = false) { if ($this->completed) { $message = "Stream has already been completed"; @@ -308,32 +349,57 @@ final class EmitSource throw new \Error($message); } + $alreadyDisposed = $this->disposed; + $this->completed = !$disposed; // $disposed is false if complete() or fail() invoked $this->disposed = $this->disposed ?: $disposed; // Once disposed, do not change flag - if ($this->result) { + if ($this->completed) { // Record stack trace when calling complete() or fail() + \assert((function () { + if (isDebugEnabled()) { + $trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS); + \array_shift($trace); // remove current closure + $this->resolutionTrace = $trace; + } + + return true; + })()); + } + + if ($alreadyDisposed) { return; } - \assert((function () { - if (isDebugEnabled()) { - $trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS); - \array_shift($trace); // remove current closure - $this->resolutionTrace = $trace; - } - - return true; - })()); - - $this->result = $result; + $this->exception = $exception; $waiting = $this->waiting; $this->waiting = []; + $result = $exception ? Promise\fail($exception) : Promise\succeed(); + foreach ($waiting as $deferred) { $deferred->resolve($result); } + $onCompletion = $this->onCompletion; + $this->onCompletion = null; + + if (!$disposed) { + /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onCompletion being null */ + foreach ($onCompletion as $callback) { + try { + $callback($exception); + } catch (\Throwable $e) { + Loop::defer(static function () use ($e) { + throw $e; + }); + } + } + } + + $onDisposal = $this->onDisposal; + $this->onDisposal = null; + if ($disposed) { $backPressure = $this->backPressure; $this->backPressure = []; @@ -342,10 +408,7 @@ final class EmitSource $deferred->resolve($result); } - $onDisposal = $this->onDisposal; - $this->onDisposal = null; - - /** @psalm-suppress PossiblyNullIterator $this->result is a guard against $this->onDisposal being null */ + /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */ foreach ($onDisposal as $callback) { try { $callback(); diff --git a/lib/Stream.php b/lib/Stream.php index 3717862..e17214d 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -27,4 +27,24 @@ interface Stream * @return void */ public function dispose(); + + /** + * Registers a callback to be invoked *only* if the stream is disposed before being completed or failed. + * + * @param callable():void $onDisposal + * + * @return void + */ + public function onDisposal(callable $onDisposal); + + /** + * Registers a callback to be invoked when the stream is completed or failed. If the stream is failed, the exception + * used to fail the stream is given as the first argument to the callback. Null is given as the first argument if + * the stream is completed. + * + * @param callable(?\Throwable):void $onCompletion + * + * @return void + */ + public function onCompletion(callable $onCompletion); } diff --git a/lib/StreamSource.php b/lib/StreamSource.php index 373f3c8..7d0662c 100644 --- a/lib/StreamSource.php +++ b/lib/StreamSource.php @@ -56,6 +56,17 @@ final class StreamSource return $this->source->isComplete(); } + /** + * @param callable(?\Throwable):void $onCompletion + * + * @return void + */ + public function onCompletion(callable $onCompletion) + { + $this->source->onCompletion($onCompletion); + } + + /** * @return bool True if the stream has been disposed. */ diff --git a/test/AsyncGeneratorTest.php b/test/AsyncGeneratorTest.php index 438d896..8856074 100644 --- a/test/AsyncGeneratorTest.php +++ b/test/AsyncGeneratorTest.php @@ -133,6 +133,8 @@ class AsyncGeneratorTest extends AsyncTestCase return $value; }); + $generator->onDisposal($this->createCallback(0)); + $generator->onCompletion($this->createCallback(1)); $this->assertSame(0, yield $generator->continue()); $this->assertNull(yield $generator->continue()); $this->assertSame($value, yield $generator->getReturn()); @@ -150,6 +152,11 @@ class AsyncGeneratorTest extends AsyncTestCase yield $yield(yield $deferred->promise()); }); + $generator->onDisposal($this->createCallback(0)); + $generator->onCompletion($this->createCallback(1, function (\Throwable $reason = null) use ($exception) { + $this->assertSame($exception, $reason); + })); + $deferred->fail($exception); try { @@ -224,6 +231,9 @@ class AsyncGeneratorTest extends AsyncTestCase yield $generator->continue(); + $generator->onDisposal($this->createCallback(1)); + $generator->onCompletion($this->createCallback(0)); + unset($generator); // Should call dispose() on the internal stream. $this->assertInstanceOf(DisposedException::class, $exception); @@ -246,8 +256,13 @@ class AsyncGeneratorTest extends AsyncTestCase yield $generator->continue(); + $generator->onDisposal($this->createCallback(1)); + $generator->onCompletion($this->createCallback(0)); + $generator->dispose(); + $generator->onDisposal($this->createCallback(1)); + $this->expectException(DisposedException::class); yield $generator->getReturn(); @@ -266,5 +281,8 @@ class AsyncGeneratorTest extends AsyncTestCase $this->assertSame(0, yield $generator->continue()); $this->assertTrue($invoked); + + $generator->onDisposal($this->createCallback(1)); + $generator->onCompletion($this->createCallback(0)); } } diff --git a/test/StreamSourceTest.php b/test/StreamSourceTest.php index 503ba21..c7eb38b 100644 --- a/test/StreamSourceTest.php +++ b/test/StreamSourceTest.php @@ -205,6 +205,42 @@ class StreamSourceTest extends AsyncTestCase $this->source->complete(); // Should throw. } + public function testOnDisposal() + { + $invoked = false; + $this->source->onDisposal(function () use (&$invoked) { + $invoked = true; + }); + + $this->assertFalse($invoked); + + $stream = $this->source->stream(); + $stream->dispose(); + + $this->assertTrue($invoked); + + $this->source->onDisposal($this->createCallback(1)); + } + + public function testOnDisposalAfterCompletion() + { + $invoked = false; + $this->source->onDisposal(function () use (&$invoked) { + $invoked = true; + }); + + $this->assertFalse($invoked); + + $this->source->complete(); + + $stream = $this->source->stream(); + $stream->dispose(); + + $this->assertFalse($invoked); + + $this->source->onDisposal($this->createCallback(0)); + } + public function testEmitAfterDisposal() { $this->expectException(DisposedException::class); @@ -213,8 +249,10 @@ class StreamSourceTest extends AsyncTestCase $stream = $this->source->stream(); $promise = $this->source->emit(1); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); $stream->dispose(); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); $this->assertTrue($this->source->isDisposed()); $this->assertNull(yield $promise); yield $this->source->emit(1); @@ -229,10 +267,63 @@ class StreamSourceTest extends AsyncTestCase $stream = $this->source->stream(); $promise = $this->source->emit(1); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); unset($stream); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); $this->assertTrue($this->source->isDisposed()); $this->assertNull(yield $promise); yield $this->source->emit(1); } + + public function testOnCompletionWithSuccessfulStream() + { + $invoked = false; + $this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked) { + $this->assertNull($exception); + $invoked = true; + }); + + $this->source->onDisposal($this->createCallback(0)); + + $this->assertFalse($invoked); + + $this->source->complete(); + + $this->assertTrue($invoked); + + $this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) { + $this->assertNull($exception); + })); + } + + public function testOnCompletionWithFailedStream() + { + $reason = new \Exception; + $invoked = false; + $this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked, $reason) { + $this->assertSame($reason, $exception); + $invoked = true; + }); + + $this->source->onDisposal($this->createCallback(0)); + + $this->assertFalse($invoked); + + $this->source->fail($reason); + + $this->assertTrue($invoked); + + $this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) use ($reason) { + $this->assertSame($reason, $exception); + })); + } + + public function testFailWithDisposedException() + { + $this->expectException(\Error::class); + $this->expectExceptionMessage('Cannot fail a stream with an instance of'); + + $this->source->fail(new DisposedException); + } }