1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 17:37:50 +01:00

Add onDisposal() and onCompletion() to Stream

onCompletion callbacks are invoked if complete() or fail() is called before the stream is disposed.

onDisposal callbacks are invoked if the stream is disposed before being completed or failed.

Forbid failing a stream with DisposedException.
This commit is contained in:
Aaron Piotrowski 2020-07-30 14:33:39 -05:00
parent 42d8ce764b
commit dd0f01f4db
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
7 changed files with 279 additions and 44 deletions

View File

@ -110,6 +110,22 @@ final class AsyncGenerator implements Stream
$this->source->dispose();
}
/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion)
{
$this->source->onCompletion($onCompletion);
}
/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal)
{
$this->source->onDisposal($onDisposal);
}
/**
* @return Promise<mixed>
*
@ -127,7 +143,7 @@ final class AsyncGenerator implements Stream
$source = $this->source;
$this->coroutine->onResolve(static function ($exception) use ($source) {
if ($source->isComplete()) {
if ($source->isDisposed()) {
return; // AsyncGenerator object was destroyed.
}

View File

@ -44,4 +44,20 @@ final class AutoDisposingStream implements Stream
{
$this->source->dispose();
}
/**
* @inheritDoc
*/
public function onCompletion(callable $onCompletion)
{
$this->source->onCompletion($onCompletion);
}
/**
* @inheritDoc
*/
public function onDisposal(callable $onDisposal)
{
$this->source->onDisposal($onDisposal);
}
}

View File

@ -20,8 +20,8 @@ use React\Promise\PromiseInterface as ReactPromise;
*/
final class EmitSource
{
/** @var Promise|null */
private $result;
/** @var \Throwable|null */
private $exception;
/** @var bool */
private $completed = false;
@ -53,6 +53,9 @@ final class EmitSource
/** @var bool */
private $used = false;
/** @var callable[]|null */
private $onCompletion = [];
/** @var callable[]|null */
private $onDisposal = [];
@ -129,8 +132,12 @@ final class EmitSource
return Promise\succeed($value);
}
if ($this->result) {
return $this->result;
if ($this->exception) {
return Promise\fail($this->exception);
}
if ($this->completed) {
return Promise\succeed();
}
$this->waiting[$position] = $deferred = new Deferred;
@ -150,44 +157,73 @@ final class EmitSource
}
/**
* @see Stream::dispose()
*
* @return void
*
* @see Stream::dispose()
*/
public function dispose()
{
if ($this->result) {
if ($this->completed || $this->disposed) {
return; // Stream already completed or failed.
}
$this->finalize(Promise\fail(new DisposedException), true);
$this->finalize(new DisposedException, true);
}
/**
* @see Stream::onDisposal()
*
* @param callable():void $onDispose
*
* @return void
*
* @see Stream::onDisposal()
*/
public function onDisposal(callable $onDisposal)
{
if ($this->result) {
if ($this->disposed) {
try {
$onDisposal();
} catch (\Throwable $e) {
Loop::defer(static function () use ($e) {
throw $e;
});
}
if ($this->disposed) {
try {
$onDisposal();
} catch (\Throwable $e) {
Loop::defer(static function () use ($e) {
throw $e;
});
}
return;
}
if ($this->completed) {
return;
}
$this->onDisposal[] = $onDisposal;
}
/**
* @param callable(?\Throwable):void $onDispose
*
* @return void
*
* @see Stream::onCompletion()
*/
public function onCompletion(callable $onCompletion)
{
if ($this->completed) {
try {
$onCompletion($this->exception);
} catch (\Throwable $e) {
Loop::defer(static function () use ($e) {
throw $e;
});
}
return;
}
if ($this->disposed) {
return;
}
$this->onCompletion[] = $onCompletion;
}
/**
* Emits a value from the stream. The returned promise is resolved once the emitted value has been consumed or
* if the stream is completed, failed, or disposed.
@ -206,9 +242,10 @@ final class EmitSource
*/
public function emit($value): Promise
{
if ($this->result) {
if ($this->completed || $this->exception) {
if ($this->disposed) {
return $this->result; // Promise failed with an instance of DisposedException.
\assert($this->exception instanceof DisposedException);
return Promise\fail($this->exception);
}
throw new \Error("Streams cannot emit values after calling complete");
@ -262,14 +299,14 @@ final class EmitSource
/**
* Completes the stream.
**
*
* @return void
*
* @throws \Error If the iterator has already been completed.
*/
public function complete()
{
$this->finalize(Promise\succeed());
$this->finalize();
}
/**
@ -281,16 +318,20 @@ final class EmitSource
*/
public function fail(\Throwable $exception)
{
$this->finalize(Promise\fail($exception));
if ($exception instanceof DisposedException) {
throw new \Error("Cannot fail a stream with an instance of " . DisposedException::class);
}
$this->finalize($exception);
}
/**
* @param Promise $result Promise with the generator result, either a null success or a failed promise.
* @param bool $disposed Flag if the generator was disposed.
* @param \Throwable $exception
* @param bool $disposed Flag if the generator was disposed.
*
* @return void
*/
private function finalize(Promise $result, bool $disposed = false)
private function finalize(\Throwable $exception = null, bool $disposed = false)
{
if ($this->completed) {
$message = "Stream has already been completed";
@ -308,32 +349,57 @@ final class EmitSource
throw new \Error($message);
}
$alreadyDisposed = $this->disposed;
$this->completed = !$disposed; // $disposed is false if complete() or fail() invoked
$this->disposed = $this->disposed ?: $disposed; // Once disposed, do not change flag
if ($this->result) {
if ($this->completed) { // Record stack trace when calling complete() or fail()
\assert((function () {
if (isDebugEnabled()) {
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
\array_shift($trace); // remove current closure
$this->resolutionTrace = $trace;
}
return true;
})());
}
if ($alreadyDisposed) {
return;
}
\assert((function () {
if (isDebugEnabled()) {
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
\array_shift($trace); // remove current closure
$this->resolutionTrace = $trace;
}
return true;
})());
$this->result = $result;
$this->exception = $exception;
$waiting = $this->waiting;
$this->waiting = [];
$result = $exception ? Promise\fail($exception) : Promise\succeed();
foreach ($waiting as $deferred) {
$deferred->resolve($result);
}
$onCompletion = $this->onCompletion;
$this->onCompletion = null;
if (!$disposed) {
/** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onCompletion being null */
foreach ($onCompletion as $callback) {
try {
$callback($exception);
} catch (\Throwable $e) {
Loop::defer(static function () use ($e) {
throw $e;
});
}
}
}
$onDisposal = $this->onDisposal;
$this->onDisposal = null;
if ($disposed) {
$backPressure = $this->backPressure;
$this->backPressure = [];
@ -342,10 +408,7 @@ final class EmitSource
$deferred->resolve($result);
}
$onDisposal = $this->onDisposal;
$this->onDisposal = null;
/** @psalm-suppress PossiblyNullIterator $this->result is a guard against $this->onDisposal being null */
/** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */
foreach ($onDisposal as $callback) {
try {
$callback();

View File

@ -27,4 +27,24 @@ interface Stream
* @return void
*/
public function dispose();
/**
* Registers a callback to be invoked *only* if the stream is disposed before being completed or failed.
*
* @param callable():void $onDisposal
*
* @return void
*/
public function onDisposal(callable $onDisposal);
/**
* Registers a callback to be invoked when the stream is completed or failed. If the stream is failed, the exception
* used to fail the stream is given as the first argument to the callback. Null is given as the first argument if
* the stream is completed.
*
* @param callable(?\Throwable):void $onCompletion
*
* @return void
*/
public function onCompletion(callable $onCompletion);
}

View File

@ -56,6 +56,17 @@ final class StreamSource
return $this->source->isComplete();
}
/**
* @param callable(?\Throwable):void $onCompletion
*
* @return void
*/
public function onCompletion(callable $onCompletion)
{
$this->source->onCompletion($onCompletion);
}
/**
* @return bool True if the stream has been disposed.
*/

View File

@ -133,6 +133,8 @@ class AsyncGeneratorTest extends AsyncTestCase
return $value;
});
$generator->onDisposal($this->createCallback(0));
$generator->onCompletion($this->createCallback(1));
$this->assertSame(0, yield $generator->continue());
$this->assertNull(yield $generator->continue());
$this->assertSame($value, yield $generator->getReturn());
@ -150,6 +152,11 @@ class AsyncGeneratorTest extends AsyncTestCase
yield $yield(yield $deferred->promise());
});
$generator->onDisposal($this->createCallback(0));
$generator->onCompletion($this->createCallback(1, function (\Throwable $reason = null) use ($exception) {
$this->assertSame($exception, $reason);
}));
$deferred->fail($exception);
try {
@ -224,6 +231,9 @@ class AsyncGeneratorTest extends AsyncTestCase
yield $generator->continue();
$generator->onDisposal($this->createCallback(1));
$generator->onCompletion($this->createCallback(0));
unset($generator); // Should call dispose() on the internal stream.
$this->assertInstanceOf(DisposedException::class, $exception);
@ -246,8 +256,13 @@ class AsyncGeneratorTest extends AsyncTestCase
yield $generator->continue();
$generator->onDisposal($this->createCallback(1));
$generator->onCompletion($this->createCallback(0));
$generator->dispose();
$generator->onDisposal($this->createCallback(1));
$this->expectException(DisposedException::class);
yield $generator->getReturn();
@ -266,5 +281,8 @@ class AsyncGeneratorTest extends AsyncTestCase
$this->assertSame(0, yield $generator->continue());
$this->assertTrue($invoked);
$generator->onDisposal($this->createCallback(1));
$generator->onCompletion($this->createCallback(0));
}
}

View File

@ -205,6 +205,42 @@ class StreamSourceTest extends AsyncTestCase
$this->source->complete(); // Should throw.
}
public function testOnDisposal()
{
$invoked = false;
$this->source->onDisposal(function () use (&$invoked) {
$invoked = true;
});
$this->assertFalse($invoked);
$stream = $this->source->stream();
$stream->dispose();
$this->assertTrue($invoked);
$this->source->onDisposal($this->createCallback(1));
}
public function testOnDisposalAfterCompletion()
{
$invoked = false;
$this->source->onDisposal(function () use (&$invoked) {
$invoked = true;
});
$this->assertFalse($invoked);
$this->source->complete();
$stream = $this->source->stream();
$stream->dispose();
$this->assertFalse($invoked);
$this->source->onDisposal($this->createCallback(0));
}
public function testEmitAfterDisposal()
{
$this->expectException(DisposedException::class);
@ -213,8 +249,10 @@ class StreamSourceTest extends AsyncTestCase
$stream = $this->source->stream();
$promise = $this->source->emit(1);
$this->source->onDisposal($this->createCallback(1));
$this->source->onCompletion($this->createCallback(0));
$stream->dispose();
$this->source->onDisposal($this->createCallback(1));
$this->source->onCompletion($this->createCallback(0));
$this->assertTrue($this->source->isDisposed());
$this->assertNull(yield $promise);
yield $this->source->emit(1);
@ -229,10 +267,63 @@ class StreamSourceTest extends AsyncTestCase
$stream = $this->source->stream();
$promise = $this->source->emit(1);
$this->source->onDisposal($this->createCallback(1));
$this->source->onCompletion($this->createCallback(0));
unset($stream);
$this->source->onDisposal($this->createCallback(1));
$this->source->onCompletion($this->createCallback(0));
$this->assertTrue($this->source->isDisposed());
$this->assertNull(yield $promise);
yield $this->source->emit(1);
}
public function testOnCompletionWithSuccessfulStream()
{
$invoked = false;
$this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked) {
$this->assertNull($exception);
$invoked = true;
});
$this->source->onDisposal($this->createCallback(0));
$this->assertFalse($invoked);
$this->source->complete();
$this->assertTrue($invoked);
$this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) {
$this->assertNull($exception);
}));
}
public function testOnCompletionWithFailedStream()
{
$reason = new \Exception;
$invoked = false;
$this->source->onCompletion(function (\Throwable $exception = null) use (&$invoked, $reason) {
$this->assertSame($reason, $exception);
$invoked = true;
});
$this->source->onDisposal($this->createCallback(0));
$this->assertFalse($invoked);
$this->source->fail($reason);
$this->assertTrue($invoked);
$this->source->onCompletion($this->createCallback(1, function (\Throwable $exception = null) use ($reason) {
$this->assertSame($reason, $exception);
}));
}
public function testFailWithDisposedException()
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('Cannot fail a stream with an instance of');
$this->source->fail(new DisposedException);
}
}