From dd0f01f4dbca8e0554eccc80977e51d82c902ffd Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 30 Jul 2020 14:33:39 -0500 Subject: [PATCH] Add onDisposal() and onCompletion() to Stream onCompletion callbacks are invoked if complete() or fail() is called before the stream is disposed. onDisposal callbacks are invoked if the stream is disposed before being completed or failed. Forbid failing a stream with DisposedException. --- lib/AsyncGenerator.php | 18 +++- lib/Internal/AutoDisposingStream.php | 16 +++ lib/Internal/EmitSource.php | 149 +++++++++++++++++++-------- lib/Stream.php | 20 ++++ lib/StreamSource.php | 11 ++ test/AsyncGeneratorTest.php | 18 ++++ test/StreamSourceTest.php | 91 ++++++++++++++++ 7 files changed, 279 insertions(+), 44 deletions(-) diff --git a/lib/AsyncGenerator.php b/lib/AsyncGenerator.php index 2fb99b4..c71f603 100644 --- a/lib/AsyncGenerator.php +++ b/lib/AsyncGenerator.php @@ -110,6 +110,22 @@ final class AsyncGenerator implements Stream $this->source->dispose(); } + /** + * @inheritDoc + */ + public function onCompletion(callable $onCompletion) + { + $this->source->onCompletion($onCompletion); + } + + /** + * @inheritDoc + */ + public function onDisposal(callable $onDisposal) + { + $this->source->onDisposal($onDisposal); + } + /** * @return Promise * @@ -127,7 +143,7 @@ final class AsyncGenerator implements Stream $source = $this->source; $this->coroutine->onResolve(static function ($exception) use ($source) { - if ($source->isComplete()) { + if ($source->isDisposed()) { return; // AsyncGenerator object was destroyed. } diff --git a/lib/Internal/AutoDisposingStream.php b/lib/Internal/AutoDisposingStream.php index 4b8f5ee..a7561cf 100644 --- a/lib/Internal/AutoDisposingStream.php +++ b/lib/Internal/AutoDisposingStream.php @@ -44,4 +44,20 @@ final class AutoDisposingStream implements Stream { $this->source->dispose(); } + + /** + * @inheritDoc + */ + public function onCompletion(callable $onCompletion) + { + $this->source->onCompletion($onCompletion); + } + + /** + * @inheritDoc + */ + public function onDisposal(callable $onDisposal) + { + $this->source->onDisposal($onDisposal); + } } diff --git a/lib/Internal/EmitSource.php b/lib/Internal/EmitSource.php index 01e883c..1d65c34 100644 --- a/lib/Internal/EmitSource.php +++ b/lib/Internal/EmitSource.php @@ -20,8 +20,8 @@ use React\Promise\PromiseInterface as ReactPromise; */ final class EmitSource { - /** @var Promise|null */ - private $result; + /** @var \Throwable|null */ + private $exception; /** @var bool */ private $completed = false; @@ -53,6 +53,9 @@ final class EmitSource /** @var bool */ private $used = false; + /** @var callable[]|null */ + private $onCompletion = []; + /** @var callable[]|null */ private $onDisposal = []; @@ -129,8 +132,12 @@ final class EmitSource return Promise\succeed($value); } - if ($this->result) { - return $this->result; + if ($this->exception) { + return Promise\fail($this->exception); + } + + if ($this->completed) { + return Promise\succeed(); } $this->waiting[$position] = $deferred = new Deferred; @@ -150,44 +157,73 @@ final class EmitSource } /** - * @see Stream::dispose() - * * @return void + * + * @see Stream::dispose() */ public function dispose() { - if ($this->result) { + if ($this->completed || $this->disposed) { return; // Stream already completed or failed. } - $this->finalize(Promise\fail(new DisposedException), true); + $this->finalize(new DisposedException, true); } /** - * @see Stream::onDisposal() - * * @param callable():void $onDispose * * @return void + * + * @see Stream::onDisposal() */ public function onDisposal(callable $onDisposal) { - if ($this->result) { - if ($this->disposed) { - try { - $onDisposal(); - } catch (\Throwable $e) { - Loop::defer(static function () use ($e) { - throw $e; - }); - } + if ($this->disposed) { + try { + $onDisposal(); + } catch (\Throwable $e) { + Loop::defer(static function () use ($e) { + throw $e; + }); } return; } + if ($this->completed) { + return; + } + $this->onDisposal[] = $onDisposal; } + /** + * @param callable(?\Throwable):void $onDispose + * + * @return void + * + * @see Stream::onCompletion() + */ + public function onCompletion(callable $onCompletion) + { + if ($this->completed) { + try { + $onCompletion($this->exception); + } catch (\Throwable $e) { + Loop::defer(static function () use ($e) { + throw $e; + }); + } + return; + } + + if ($this->disposed) { + return; + } + + $this->onCompletion[] = $onCompletion; + } + /** * Emits a value from the stream. The returned promise is resolved once the emitted value has been consumed or * if the stream is completed, failed, or disposed. @@ -206,9 +242,10 @@ final class EmitSource */ public function emit($value): Promise { - if ($this->result) { + if ($this->completed || $this->exception) { if ($this->disposed) { - return $this->result; // Promise failed with an instance of DisposedException. + \assert($this->exception instanceof DisposedException); + return Promise\fail($this->exception); } throw new \Error("Streams cannot emit values after calling complete"); @@ -262,14 +299,14 @@ final class EmitSource /** * Completes the stream. - ** + * * @return void * * @throws \Error If the iterator has already been completed. */ public function complete() { - $this->finalize(Promise\succeed()); + $this->finalize(); } /** @@ -281,16 +318,20 @@ final class EmitSource */ public function fail(\Throwable $exception) { - $this->finalize(Promise\fail($exception)); + if ($exception instanceof DisposedException) { + throw new \Error("Cannot fail a stream with an instance of " . DisposedException::class); + } + + $this->finalize($exception); } /** - * @param Promise $result Promise with the generator result, either a null success or a failed promise. - * @param bool $disposed Flag if the generator was disposed. + * @param \Throwable $exception + * @param bool $disposed Flag if the generator was disposed. * * @return void */ - private function finalize(Promise $result, bool $disposed = false) + private function finalize(\Throwable $exception = null, bool $disposed = false) { if ($this->completed) { $message = "Stream has already been completed"; @@ -308,32 +349,57 @@ final class EmitSource throw new \Error($message); } + $alreadyDisposed = $this->disposed; + $this->completed = !$disposed; // $disposed is false if complete() or fail() invoked $this->disposed = $this->disposed ?: $disposed; // Once disposed, do not change flag - if ($this->result) { + if ($this->completed) { // Record stack trace when calling complete() or fail() + \assert((function () { + if (isDebugEnabled()) { + $trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS); + \array_shift($trace); // remove current closure + $this->resolutionTrace = $trace; + } + + return true; + })()); + } + + if ($alreadyDisposed) { return; } - \assert((function () { - if (isDebugEnabled()) { - $trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS); - \array_shift($trace); // remove current closure - $this->resolutionTrace = $trace; - } - - return true; - })()); - - $this->result = $result; + $this->exception = $exception; $waiting = $this->waiting; $this->waiting = []; + $result = $exception ? Promise\fail($exception) : Promise\succeed(); + foreach ($waiting as $deferred) { $deferred->resolve($result); } + $onCompletion = $this->onCompletion; + $this->onCompletion = null; + + if (!$disposed) { + /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onCompletion being null */ + foreach ($onCompletion as $callback) { + try { + $callback($exception); + } catch (\Throwable $e) { + Loop::defer(static function () use ($e) { + throw $e; + }); + } + } + } + + $onDisposal = $this->onDisposal; + $this->onDisposal = null; + if ($disposed) { $backPressure = $this->backPressure; $this->backPressure = []; @@ -342,10 +408,7 @@ final class EmitSource $deferred->resolve($result); } - $onDisposal = $this->onDisposal; - $this->onDisposal = null; - - /** @psalm-suppress PossiblyNullIterator $this->result is a guard against $this->onDisposal being null */ + /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */ foreach ($onDisposal as $callback) { try { $callback(); diff --git a/lib/Stream.php b/lib/Stream.php index 3717862..e17214d 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -27,4 +27,24 @@ interface Stream * @return void */ public function dispose(); + + /** + * Registers a callback to be invoked *only* if the stream is disposed before being completed or failed. + * + * @param callable():void $onDisposal + * + * @return void + */ + public function onDisposal(callable $onDisposal); + + /** + * Registers a callback to be invoked when the stream is completed or failed. If the stream is failed, the exception + * used to fail the stream is given as the first argument to the callback. Null is given as the first argument if + * the stream is completed. + * + * @param callable(?\Throwable):void $onCompletion + * + * @return void + */ + public function onCompletion(callable $onCompletion); } diff --git a/lib/StreamSource.php b/lib/StreamSource.php index 373f3c8..7d0662c 100644 --- a/lib/StreamSource.php +++ b/lib/StreamSource.php @@ -56,6 +56,17 @@ final class StreamSource return $this->source->isComplete(); } + /** + * @param callable(?\Throwable):void $onCompletion + * + * @return void + */ + public function onCompletion(callable $onCompletion) + { + $this->source->onCompletion($onCompletion); + } + + /** * @return bool True if the stream has been disposed. */ diff --git a/test/AsyncGeneratorTest.php b/test/AsyncGeneratorTest.php index 438d896..8856074 100644 --- a/test/AsyncGeneratorTest.php +++ b/test/AsyncGeneratorTest.php @@ -133,6 +133,8 @@ class AsyncGeneratorTest extends AsyncTestCase return $value; }); + $generator->onDisposal($this->createCallback(0)); + $generator->onCompletion($this->createCallback(1)); $this->assertSame(0, yield $generator->continue()); $this->assertNull(yield $generator->continue()); $this->assertSame($value, yield $generator->getReturn()); @@ -150,6 +152,11 @@ class AsyncGeneratorTest extends AsyncTestCase yield $yield(yield $deferred->promise()); }); + $generator->onDisposal($this->createCallback(0)); + $generator->onCompletion($this->createCallback(1, function (\Throwable $reason = null) use ($exception) { + $this->assertSame($exception, $reason); + })); + $deferred->fail($exception); try { @@ -224,6 +231,9 @@ class AsyncGeneratorTest extends AsyncTestCase yield $generator->continue(); + $generator->onDisposal($this->createCallback(1)); + $generator->onCompletion($this->createCallback(0)); + unset($generator); // Should call dispose() on the internal stream. $this->assertInstanceOf(DisposedException::class, $exception); @@ -246,8 +256,13 @@ class AsyncGeneratorTest extends AsyncTestCase yield $generator->continue(); + $generator->onDisposal($this->createCallback(1)); + $generator->onCompletion($this->createCallback(0)); + $generator->dispose(); + $generator->onDisposal($this->createCallback(1)); + $this->expectException(DisposedException::class); yield $generator->getReturn(); @@ -266,5 +281,8 @@ class AsyncGeneratorTest extends AsyncTestCase $this->assertSame(0, yield $generator->continue()); $this->assertTrue($invoked); + + $generator->onDisposal($this->createCallback(1)); + $generator->onCompletion($this->createCallback(0)); } } diff --git a/test/StreamSourceTest.php b/test/StreamSourceTest.php index 503ba21..c7eb38b 100644 --- a/test/StreamSourceTest.php +++ b/test/StreamSourceTest.php @@ -205,6 +205,42 @@ class StreamSourceTest extends AsyncTestCase $this->source->complete(); // Should throw. } + public function testOnDisposal() + { + $invoked = false; + $this->source->onDisposal(function () use (&$invoked) { + $invoked = true; + }); + + $this->assertFalse($invoked); + + $stream = $this->source->stream(); + $stream->dispose(); + + $this->assertTrue($invoked); + + $this->source->onDisposal($this->createCallback(1)); + } + + public function testOnDisposalAfterCompletion() + { + $invoked = false; + $this->source->onDisposal(function () use (&$invoked) { + $invoked = true; + }); + + $this->assertFalse($invoked); + + $this->source->complete(); + + $stream = $this->source->stream(); + $stream->dispose(); + + $this->assertFalse($invoked); + + $this->source->onDisposal($this->createCallback(0)); + } + public function testEmitAfterDisposal() { $this->expectException(DisposedException::class); @@ -213,8 +249,10 @@ class StreamSourceTest extends AsyncTestCase $stream = $this->source->stream(); $promise = $this->source->emit(1); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); $stream->dispose(); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); $this->assertTrue($this->source->isDisposed()); $this->assertNull(yield $promise); yield $this->source->emit(1); @@ -229,10 +267,63 @@ class StreamSourceTest extends AsyncTestCase $stream = $this->source->stream(); $promise = $this->source->emit(1); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); unset($stream); $this->source->onDisposal($this->createCallback(1)); + $this->source->onCompletion($this->createCallback(0)); $this->assertTrue($this->source->isDisposed()); $this->assertNull(yield $promise); yield $this->source->emit(1); } + + public function testOnCompletionWithSuccessfulStream() + { + $invoked = false; + $this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked) { + $this->assertNull($exception); + $invoked = true; + }); + + $this->source->onDisposal($this->createCallback(0)); + + $this->assertFalse($invoked); + + $this->source->complete(); + + $this->assertTrue($invoked); + + $this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) { + $this->assertNull($exception); + })); + } + + public function testOnCompletionWithFailedStream() + { + $reason = new \Exception; + $invoked = false; + $this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked, $reason) { + $this->assertSame($reason, $exception); + $invoked = true; + }); + + $this->source->onDisposal($this->createCallback(0)); + + $this->assertFalse($invoked); + + $this->source->fail($reason); + + $this->assertTrue($invoked); + + $this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) use ($reason) { + $this->assertSame($reason, $exception); + })); + } + + public function testFailWithDisposedException() + { + $this->expectException(\Error::class); + $this->expectExceptionMessage('Cannot fail a stream with an instance of'); + + $this->source->fail(new DisposedException); + } }